HBASE-9857 Blockcache prefetch option

This commit is contained in:
Andrew Purtell 2014-05-22 10:17:39 -07:00
parent 53513dcb45
commit 58818496da
25 changed files with 517 additions and 122 deletions

View File

@ -88,6 +88,13 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
public static final String CACHE_INDEX_ON_WRITE = "CACHE_INDEX_ON_WRITE";
public static final String CACHE_BLOOMS_ON_WRITE = "CACHE_BLOOMS_ON_WRITE";
public static final String EVICT_BLOCKS_ON_CLOSE = "EVICT_BLOCKS_ON_CLOSE";
/**
* Key for the PREFETCH_BLOCKS_ON_OPEN attribute.
* If set, all INDEX, BLOOM, and DATA blocks of HFiles belonging to this
* family will be loaded into the cache as soon as the file is opened. These
* loads will not count as cache misses.
*/
public static final String PREFETCH_BLOCKS_ON_OPEN = "PREFETCH_BLOCKS_ON_OPEN";
/**
* Size of storefile/hfile 'blocks'. Default is {@link #DEFAULT_BLOCKSIZE}.
@ -207,6 +214,11 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
*/
public static final boolean DEFAULT_COMPRESS_TAGS = true;
/*
* Default setting for whether to prefetch blocks into the blockcache on open.
*/
public static final boolean DEFAULT_PREFETCH_BLOCKS_ON_OPEN = false;
private final static Map<String, String> DEFAULT_VALUES
= new HashMap<String, String>();
private final static Set<ImmutableBytesWritable> RESERVED_KEYWORDS
@ -227,6 +239,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
DEFAULT_VALUES.put(CACHE_INDEX_ON_WRITE, String.valueOf(DEFAULT_CACHE_INDEX_ON_WRITE));
DEFAULT_VALUES.put(CACHE_BLOOMS_ON_WRITE, String.valueOf(DEFAULT_CACHE_BLOOMS_ON_WRITE));
DEFAULT_VALUES.put(EVICT_BLOCKS_ON_CLOSE, String.valueOf(DEFAULT_EVICT_BLOCKS_ON_CLOSE));
DEFAULT_VALUES.put(PREFETCH_BLOCKS_ON_OPEN, String.valueOf(DEFAULT_PREFETCH_BLOCKS_ON_OPEN));
for (String s : DEFAULT_VALUES.keySet()) {
RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s)));
}
@ -933,6 +946,25 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
return setValue(EVICT_BLOCKS_ON_CLOSE, Boolean.toString(value));
}
/**
* @return true if we should prefetch blocks into the blockcache on open
*/
public boolean shouldPrefetchBlocksOnOpen() {
String value = getValue(PREFETCH_BLOCKS_ON_OPEN);
if (value != null) {
return Boolean.valueOf(value).booleanValue();
}
return DEFAULT_PREFETCH_BLOCKS_ON_OPEN;
}
/**
* @param value true if we should prefetch blocks into the blockcache on open
* @return this (for chained invocation)
*/
public HColumnDescriptor setPrefetchBlocksOnOpen(boolean value) {
return setValue(PREFETCH_BLOCKS_ON_OPEN, Boolean.toString(value));
}
/**
* @see java.lang.Object#toString()
*/

View File

@ -51,10 +51,11 @@ public interface BlockCache {
* @param caching Whether this request has caching enabled (used for stats)
* @param repeat Whether this is a repeat lookup for the same block
* (used to avoid double counting cache misses when doing double-check locking)
* @param updateCacheMetrics Whether to update cache metrics or not
* @return Block or null if block is not in 2 cache.
* @see HFileReaderV2#readBlock(long, long, boolean, boolean, boolean, BlockType, DataBlockEncoding)
*/
Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat);
Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics);
/**
* Evict block from cache.

View File

@ -146,6 +146,13 @@ public class CacheConfig {
public static final String SLAB_CACHE_OFFHEAP_PERCENTAGE_KEY =
"hbase.offheapcache.percentage";
/**
* Configuration key to prefetch all blocks of a given file into the block cache
* when the file is opened.
*/
public static final String PREFETCH_BLOCKS_ON_OPEN_KEY =
"hbase.rs.prefetchblocksonopen";
// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@ -155,6 +162,7 @@ public class CacheConfig {
public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false;
public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
public static final boolean DEFAULT_COMPRESSED_CACHE = false;
public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
/** Local reference to the block cache, null if completely disabled */
private final BlockCache blockCache;
@ -185,6 +193,9 @@ public class CacheConfig {
/** Whether data blocks should be stored in compressed form in the cache */
private final boolean cacheCompressed;
/** Whether data blocks should be prefetched into the cache */
private final boolean prefetchOnOpen;
/**
* Create a cache configuration using the specified configuration object and
* family descriptor.
@ -205,7 +216,9 @@ public class CacheConfig {
DEFAULT_CACHE_BLOOMS_ON_WRITE) || family.shouldCacheBloomsOnWrite(),
conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY,
DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(),
conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE)
conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE),
conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY,
DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen()
);
}
@ -226,7 +239,8 @@ public class CacheConfig {
DEFAULT_CACHE_BLOOMS_ON_WRITE),
conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE),
conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY,
DEFAULT_COMPRESSED_CACHE)
DEFAULT_COMPRESSED_CACHE),
conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN)
);
}
@ -242,12 +256,13 @@ public class CacheConfig {
* @param cacheBloomsOnWrite whether blooms should be cached on write
* @param evictOnClose whether blocks should be evicted when HFile is closed
* @param cacheCompressed whether to store blocks as compressed in the cache
* @param prefetchOnOpen whether to prefetch blocks upon open
*/
CacheConfig(final BlockCache blockCache,
final boolean cacheDataOnRead, final boolean inMemory,
final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite,
final boolean cacheBloomsOnWrite, final boolean evictOnClose,
final boolean cacheCompressed) {
final boolean cacheCompressed, final boolean prefetchOnOpen) {
this.blockCache = blockCache;
this.cacheDataOnRead = cacheDataOnRead;
this.inMemory = inMemory;
@ -256,6 +271,7 @@ public class CacheConfig {
this.cacheBloomsOnWrite = cacheBloomsOnWrite;
this.evictOnClose = evictOnClose;
this.cacheCompressed = cacheCompressed;
this.prefetchOnOpen = prefetchOnOpen;
LOG.info(this);
}
@ -267,7 +283,7 @@ public class CacheConfig {
this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory,
cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite,
cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose,
cacheConf.cacheCompressed);
cacheConf.cacheCompressed, cacheConf.prefetchOnOpen);
}
/**
@ -303,7 +319,10 @@ public class CacheConfig {
boolean shouldCache = isBlockCacheEnabled()
&& (cacheDataOnRead ||
category == BlockCategory.INDEX ||
category == BlockCategory.BLOOM);
category == BlockCategory.BLOOM ||
(prefetchOnOpen &&
(category != BlockCategory.META &&
category != BlockCategory.UNKNOWN)));
return shouldCache;
}
@ -371,6 +390,13 @@ public class CacheConfig {
return isBlockCacheEnabled() && this.cacheCompressed;
}
/**
* @return true if blocks should be prefetched into the cache on open, false if not
*/
public boolean shouldPrefetchOnOpen() {
return isBlockCacheEnabled() && this.prefetchOnOpen;
}
@Override
public String toString() {
if (!isBlockCacheEnabled()) {
@ -382,7 +408,8 @@ public class CacheConfig {
", cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() +
", cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() +
", cacheEvictOnClose=" + shouldEvictOnClose() +
", cacheCompressed=" + shouldCacheCompressed();
", cacheCompressed=" + shouldCacheCompressed() +
", prefetchOnOpen=" + shouldPrefetchOnOpen();
}
// Static block cache reference and methods
@ -476,4 +503,4 @@ public class CacheConfig {
}
return GLOBAL_BLOCK_CACHE_INSTANCE;
}
}
}

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
* CombinedBlockCache is an abstraction layer that combines
* {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used
* to cache bloom blocks and index blocks. The larger bucketCache is used to
* cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean)}, boolean, boolean) reads
* cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean, boolean)} reads
* first from the smaller lruCache before looking for the block in the bucketCache. Blocks evicted
* from lruCache are put into the bucket cache.
* Metrics are the combined size and hits and misses of both caches.
@ -72,13 +72,13 @@ public class CombinedBlockCache implements BlockCache, HeapSize {
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
boolean repeat) {
boolean repeat, boolean updateCacheMetrics) {
// TODO: is there a hole here, or just awkwardness since in the lruCache getBlock
// we end up calling bucketCache.getBlock.
if (lruCache.containsBlock(cacheKey)) {
return lruCache.getBlock(cacheKey, caching, repeat);
return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
}
return bucketCache.getBlock(cacheKey, caching, repeat);
return bucketCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
}
@Override

View File

@ -91,22 +91,25 @@ public class DoubleBlockCache implements ResizableBlockCache, HeapSize {
}
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
Cacheable cachedBlock;
if ((cachedBlock = onHeapCache.getBlock(cacheKey, caching, repeat)) != null) {
stats.hit(caching);
if ((cachedBlock = onHeapCache.getBlock(cacheKey, caching, repeat,
updateCacheMetrics)) != null) {
if (updateCacheMetrics) stats.hit(caching);
return cachedBlock;
} else if ((cachedBlock = offHeapCache.getBlock(cacheKey, caching, repeat)) != null) {
} else if ((cachedBlock = offHeapCache.getBlock(cacheKey, caching, repeat,
updateCacheMetrics)) != null) {
if (caching) {
onHeapCache.cacheBlock(cacheKey, cachedBlock);
}
stats.hit(caching);
if (updateCacheMetrics) stats.hit(caching);
return cachedBlock;
}
if (!repeat) stats.miss(caching);
if (!repeat && updateCacheMetrics) stats.miss(caching);
return null;
}

View File

@ -459,7 +459,8 @@ public class HFile {
*/
HFileBlock readBlock(long offset, long onDiskBlockSize,
boolean cacheBlock, final boolean pread, final boolean isCompaction,
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding)
final boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding)
throws IOException;
}

View File

@ -258,7 +258,7 @@ public class HFileBlockIndex {
expectedBlockType = BlockType.DATA;
}
block = cachingBlockReader.readBlock(currentOffset,
currentOnDiskSize, shouldCache, pread, isCompaction,
currentOnDiskSize, shouldCache, pread, isCompaction, true,
expectedBlockType, expectedDataBlockEncoding);
}
@ -337,7 +337,7 @@ public class HFileBlockIndex {
// Caching, using pread, assuming this is not a compaction.
HFileBlock midLeafBlock = cachingBlockReader.readBlock(
midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false,
midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
BlockType.LEAF_INDEX, null);
ByteBuffer b = midLeafBlock.getBufferWithoutHeader();

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.io.WritableUtils;
import org.htrace.Trace;
import org.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
/**
* {@link HFile} reader for version 2.
*/
@ -116,7 +118,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
* @param hfs
* @param conf
*/
public HFileReaderV2(Path path, FixedFileTrailer trailer,
public HFileReaderV2(final Path path, final FixedFileTrailer trailer,
final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
final HFileSystem hfs, final Configuration conf) throws IOException {
super(path, trailer, size, cacheConf, hfs, conf);
@ -177,6 +179,42 @@ public class HFileReaderV2 extends AbstractHFileReader {
while ((b = blockIter.nextBlock()) != null) {
loadOnOpenBlocks.add(b);
}
// Prefetch file blocks upon open if requested
if (cacheConf.shouldPrefetchOnOpen()) {
PrefetchExecutor.request(path, new Runnable() {
public void run() {
try {
long offset = 0;
long end = fileSize - getTrailer().getTrailerSize();
HFileBlock prevBlock = null;
while (offset < end) {
if (Thread.interrupted()) {
break;
}
long onDiskSize = -1;
if (prevBlock != null) {
onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
}
HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
null, null);
prevBlock = block;
offset += block.getOnDiskSizeWithHeader();
}
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
if (LOG.isTraceEnabled()) {
LOG.trace("Exception encountered while prefetching " + path + ":", e);
}
} catch (Exception e) {
// Other exceptions are interesting
LOG.warn("Exception encountered while prefetching " + path + ":", e);
} finally {
PrefetchExecutor.complete(path);
}
}
});
}
}
protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
@ -212,13 +250,13 @@ public class HFileReaderV2 extends AbstractHFileReader {
}
private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
boolean isCompaction, BlockType expectedBlockType,
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
// Check cache for block. If found return.
if (cacheConf.isBlockCacheEnabled()) {
BlockCache cache = cacheConf.getBlockCache();
HFileBlock cachedBlock =
(HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock);
HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock,
updateCacheMetrics);
if (cachedBlock != null) {
validateBlockType(cachedBlock, expectedBlockType);
@ -297,7 +335,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
cacheBlock &= cacheConf.shouldCacheDataOnRead();
if (cacheConf.isBlockCacheEnabled()) {
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, false,
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true,
BlockType.META, null);
if (cachedBlock != null) {
// Return a distinct 'shallow copy' of the block,
@ -348,7 +386,8 @@ public class HFileReaderV2 extends AbstractHFileReader {
@Override
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
final boolean cacheBlock, boolean pread, final boolean isCompaction,
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding)
boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding)
throws IOException {
if (dataBlockIndexReader == null) {
throw new IOException("Block index not loaded");
@ -382,12 +421,13 @@ public class HFileReaderV2 extends AbstractHFileReader {
// Try and get the block from the block cache. If the useLock variable is true then this
// is the second time through the loop and it should not be counted as a block cache miss.
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
expectedBlockType, expectedDataBlockEncoding);
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
if (cachedBlock != null) {
validateBlockType(cachedBlock, expectedBlockType);
if (cachedBlock.getBlockType().isData()) {
HFile.dataBlockReadCnt.incrementAndGet();
if (updateCacheMetrics) {
HFile.dataBlockReadCnt.incrementAndGet();
}
// Validate encoding type for data blocks. We include encoding
// type in the cache key, and we expect it to match on a cache hit.
if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
@ -422,7 +462,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
}
if (hfileBlock.getBlockType().isData()) {
if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
HFile.dataBlockReadCnt.incrementAndGet();
}
@ -493,6 +533,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
}
public void close(boolean evictOnClose) throws IOException {
PrefetchExecutor.cancel(path);
if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
if (LOG.isTraceEnabled()) {
@ -644,7 +685,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
// figure out the size.
seekToBlock = reader.readBlock(previousBlockOffset,
seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
pread, isCompaction, BlockType.DATA, getEffectiveDataBlockEncoding());
pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
// TODO shortcut: seek forward in this block to the last key of the
// block.
}
@ -680,7 +721,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
curBlock = reader.readBlock(curBlock.getOffset()
+ curBlock.getOnDiskSizeWithHeader(),
curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
isCompaction, null, getEffectiveDataBlockEncoding());
isCompaction, true, null, getEffectiveDataBlockEncoding());
} while (!curBlock.getBlockType().isData());
return curBlock;
@ -844,7 +885,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
}
block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
isCompaction, BlockType.DATA, getEffectiveDataBlockEncoding());
isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
if (block.getOffset() < 0) {
throw new IOException("Invalid block offset: " + block.getOffset());
}
@ -1139,7 +1180,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
}
block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
isCompaction, BlockType.DATA, getEffectiveDataBlockEncoding());
isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
if (block.getOffset() < 0) {
throw new IOException("Invalid block offset: " + block.getOffset());
}
@ -1286,4 +1327,13 @@ public class HFileReaderV2 extends AbstractHFileReader {
public HFileContext getFileContext() {
return hfileContext;
}
/**
* Returns false if block prefetching was requested for this file and has
* not completed, true otherwise
*/
@VisibleForTesting
boolean prefetchComplete() {
return PrefetchExecutor.isCompleted(path);
}
}

View File

@ -69,7 +69,7 @@ public class HFileReaderV3 extends HFileReaderV2 {
* @param conf
* Configuration
*/
public HFileReaderV3(Path path, FixedFileTrailer trailer, final FSDataInputStreamWrapper fsdis,
public HFileReaderV3(final Path path, FixedFileTrailer trailer, final FSDataInputStreamWrapper fsdis,
final long size, final CacheConfig cacheConf, final HFileSystem hfs,
final Configuration conf) throws IOException {
super(path, trailer, fsdis, size, cacheConf, hfs, conf);

View File

@ -377,19 +377,21 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
* @param caching true if the caller caches blocks on cache misses
* @param repeat Whether this is a repeat lookup for the same block
* (used to avoid double counting cache misses when doing double-check locking)
* @param updateCacheMetrics Whether to update cache metrics or not
* @return buffer of specified cache key, or null if not in cache
* @see HFileReaderV2#readBlock(long, long, boolean, boolean, boolean, BlockType, DataBlockEncoding)
*/
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
CachedBlock cb = map.get(cacheKey);
if (cb == null) {
if (!repeat) stats.miss(caching);
if (victimHandler != null)
return victimHandler.getBlock(cacheKey, caching, repeat);
if (!repeat && updateCacheMetrics) stats.miss(caching);
if (victimHandler != null) {
return victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
}
return null;
}
stats.hit(caching);
if (updateCacheMetrics) stats.hit(caching);
cb.access(count.incrementAndGet());
return cb.getBuffer();
}

View File

@ -0,0 +1,122 @@
package org.apache.hadoop.hbase.io.hfile;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
public class PrefetchExecutor {
private static final Log LOG = LogFactory.getLog(PrefetchExecutor.class);
/** Futures for tracking block prefetch activity */
private static final Map<Path,Future<?>> prefetchFutures =
new ConcurrentSkipListMap<Path,Future<?>>();
/** Executor pool shared among all HFiles for block prefetch */
private static final ScheduledExecutorService prefetchExecutorPool;
/** Delay before beginning prefetch */
private static final int prefetchDelayMillis;
/** Variation in prefetch delay times, to mitigate stampedes */
private static final float prefetchDelayVariation;
static {
// Consider doing this on demand with a configuration passed in rather
// than in a static initializer.
Configuration conf = HBaseConfiguration.create();
// 1s here for tests, consider 30s in hbase-default.xml
// Set to 0 for no delay
prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("hfile-prefetch-" + System.currentTimeMillis());
t.setDaemon(true);
return t;
}
});
}
private static final Random RNG = new Random();
// TODO: We want HFile, which is where the blockcache lives, to handle
// prefetching of file blocks but the Store level is where path convention
// knowledge should be contained
private static final Pattern prefetchPathExclude =
Pattern.compile(
"(" +
Path.SEPARATOR_CHAR +
HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") +
Path.SEPARATOR_CHAR +
")|(" +
Path.SEPARATOR_CHAR +
HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") +
Path.SEPARATOR_CHAR +
")");
public static void request(Path path, Runnable runnable) {
if (!prefetchPathExclude.matcher(path.toString()).find()) {
long delay;
if (prefetchDelayMillis > 0) {
delay = (long)((prefetchDelayMillis * (1.0f - (prefetchDelayVariation/2))) +
(prefetchDelayMillis * (prefetchDelayVariation/2) * RNG.nextFloat()));
} else {
delay = 0;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms");
}
prefetchFutures.put(path, prefetchExecutorPool.schedule(runnable, delay,
TimeUnit.MILLISECONDS));
} catch (RejectedExecutionException e) {
prefetchFutures.remove(path);
LOG.warn("Prefetch request rejected for " + path);
}
}
}
public static void complete(Path path) {
prefetchFutures.remove(path);
if (LOG.isDebugEnabled()) {
LOG.debug("Prefetch completed for " + path);
}
}
public static void cancel(Path path) {
Future<?> future = prefetchFutures.get(path);
if (future != null) {
// ok to race with other cancellation attempts
future.cancel(true);
prefetchFutures.remove(path);
if (LOG.isDebugEnabled()) {
LOG.debug("Prefetch cancelled for " + path);
}
}
}
public static boolean isCompleted(Path path) {
Future<?> future = prefetchFutures.get(path);
if (future != null) {
return future.isDone();
}
return true;
}
}

View File

@ -347,15 +347,17 @@ public class BucketCache implements BlockCache, HeapSize {
* @param key block's cache key
* @param caching true if the caller caches blocks on cache misses
* @param repeat Whether this is a repeat lookup for the same block
* @param updateCacheMetrics Whether we should update cache metrics or not
* @return buffer of specified cache key, or null if not in cache
*/
@Override
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) {
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
if (!cacheEnabled)
return null;
RAMQueueEntry re = ramCache.get(key);
if (re != null) {
cacheStats.hit(caching);
if (updateCacheMetrics) cacheStats.hit(caching);
re.access(accessCount.incrementAndGet());
return re.getData();
}
@ -375,8 +377,10 @@ public class BucketCache implements BlockCache, HeapSize {
Cacheable cachedBlock = bucketEntry.deserializerReference(
deserialiserMap).deserialize(bb, true);
long timeTaken = System.nanoTime() - start;
cacheStats.hit(caching);
cacheStats.ioHit(timeTaken);
if (updateCacheMetrics) {
cacheStats.hit(caching);
cacheStats.ioHit(timeTaken);
}
bucketEntry.access(accessCount.incrementAndGet());
if (this.ioErrorStartTime > 0) {
ioErrorStartTime = -1;
@ -392,7 +396,7 @@ public class BucketCache implements BlockCache, HeapSize {
}
}
}
if(!repeat) cacheStats.miss(caching);
if (!repeat && updateCacheMetrics) cacheStats.miss(caching);
return null;
}
@ -1192,4 +1196,4 @@ public class BucketCache implements BlockCache, HeapSize {
writerThread.join();
}
}
}
}

View File

@ -149,14 +149,15 @@ public class SingleSizeCache implements BlockCache, HeapSize {
}
@Override
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) {
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
CacheablePair contentBlock = backingMap.get(key);
if (contentBlock == null) {
if (!repeat) stats.miss(caching);
if (!repeat && updateCacheMetrics) stats.miss(caching);
return null;
}
stats.hit(caching);
if (updateCacheMetrics) stats.hit(caching);
// If lock cannot be obtained, that means we're undergoing eviction.
try {
contentBlock.recentlyAccessed.set(System.nanoTime());

View File

@ -245,19 +245,20 @@ public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize {
*
* @return buffer of specified block name, or null if not in cache
*/
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) {
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
SingleSizeCache cachedBlock = backingStore.get(key);
if (cachedBlock == null) {
if (!repeat) stats.miss(caching);
return null;
}
Cacheable contentBlock = cachedBlock.getBlock(key, caching, false);
Cacheable contentBlock = cachedBlock.getBlock(key, caching, false, updateCacheMetrics);
if (contentBlock != null) {
stats.hit(caching);
if (updateCacheMetrics) stats.hit(caching);
} else if (!repeat) {
stats.miss(caching);
if (updateCacheMetrics) stats.miss(caching);
}
return contentBlock;
}

View File

@ -98,7 +98,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase
try {
// We cache the block and use a positional read.
bloomBlock = reader.readBlock(index.getRootBlockOffset(block),
index.getRootBlockDataSize(block), true, true, false,
index.getRootBlockDataSize(block), true, true, false, true,
BlockType.BLOOM_CHUNK, null);
} catch (IOException ex) {
// The Bloom filter is broken, turn it off.

View File

@ -93,12 +93,12 @@ public class CacheTestUtils {
}
toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
false, false);
false, false, true);
if (retrievedBlock != null) {
assertEquals(ourBlock.block, retrievedBlock);
toBeTested.evictBlock(ourBlock.blockName);
hits.incrementAndGet();
assertNull(toBeTested.getBlock(ourBlock.blockName, false, false));
assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
} else {
miss.incrementAndGet();
}
@ -126,7 +126,7 @@ public class CacheTestUtils {
HFileBlockPair[] blocks = generateHFileBlocks(numBlocks, blockSize);
// Confirm empty
for (HFileBlockPair block : blocks) {
assertNull(toBeTested.getBlock(block.blockName, true, false));
assertNull(toBeTested.getBlock(block.blockName, true, false, true));
}
// Add blocks
@ -139,7 +139,7 @@ public class CacheTestUtils {
// MapMaker makes no guarantees when it will evict, so neither can we.
for (HFileBlockPair block : blocks) {
HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false);
HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true);
if (buf != null) {
assertEquals(block.block, buf);
}
@ -150,7 +150,7 @@ public class CacheTestUtils {
for (HFileBlockPair block : blocks) {
try {
if (toBeTested.getBlock(block.blockName, true, false) != null) {
if (toBeTested.getBlock(block.blockName, true, false, true) != null) {
toBeTested.cacheBlock(block.blockName, block.block);
if (!(toBeTested instanceof BucketCache)) {
// BucketCache won't throw exception when caching already cached
@ -184,7 +184,7 @@ public class CacheTestUtils {
@Override
public void doAnAction() throws Exception {
ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested
.getBlock(key, false, false);
.getBlock(key, false, false, true);
assertArrayEquals(buf, returned.buf);
totalQueries.incrementAndGet();
}
@ -223,7 +223,7 @@ public class CacheTestUtils {
final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested
.getBlock(key, true, false);
.getBlock(key, true, false, true);
if (gotBack != null) {
assertArrayEquals(gotBack.buf, bac.buf);
} else {

View File

@ -245,10 +245,10 @@ public class TestCacheOnWrite {
// Flags: don't cache the block, use pread, this is not a compaction.
// Also, pass null for expected block type to avoid checking it.
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
false, null, encodingInCache);
false, true, null, encodingInCache);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
offset);
boolean isCached = blockCache.getBlock(blockCacheKey, true, false) != null;
boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
if (shouldBeCached != isCached) {
throw new AssertionError(

View File

@ -166,7 +166,8 @@ public class TestHFileBlockIndex {
@Override
public HFileBlock readBlock(long offset, long onDiskSize,
boolean cacheBlock, boolean pread, boolean isCompaction,
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding)
boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding)
throws IOException {
if (offset == prevOffset && onDiskSize == prevOnDiskSize &&
pread == prevPread) {

View File

@ -83,7 +83,7 @@ public class TestHFileDataBlockEncoder {
BlockCacheKey cacheKey = new BlockCacheKey("test", 0);
blockCache.cacheBlock(cacheKey, cacheBlock);
HeapSize heapSize = blockCache.getBlock(cacheKey, false, false);
HeapSize heapSize = blockCache.getBlock(cacheKey, false, false, true);
assertTrue(heapSize instanceof HFileBlock);
HFileBlock returnedBlock = (HFileBlock) heapSize;;

View File

@ -106,7 +106,7 @@ public class TestLruBlockCache {
// Confirm empty
for (CachedItem block : blocks) {
assertTrue(cache.getBlock(block.cacheKey, true, false) == null);
assertTrue(cache.getBlock(block.cacheKey, true, false, true) == null);
}
// Add blocks
@ -120,7 +120,7 @@ public class TestLruBlockCache {
// Check if all blocks are properly cached and retrieved
for (CachedItem block : blocks) {
HeapSize buf = cache.getBlock(block.cacheKey, true, false);
HeapSize buf = cache.getBlock(block.cacheKey, true, false, true);
assertTrue(buf != null);
assertEquals(buf.heapSize(), block.heapSize());
}
@ -139,7 +139,7 @@ public class TestLruBlockCache {
// Check if all blocks are properly cached and retrieved
for (CachedItem block : blocks) {
HeapSize buf = cache.getBlock(block.cacheKey, true, false);
HeapSize buf = cache.getBlock(block.cacheKey, true, false, true);
assertTrue(buf != null);
assertEquals(buf.heapSize(), block.heapSize());
}
@ -184,9 +184,9 @@ public class TestLruBlockCache {
(maxSize * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
// All blocks except block 0 should be in the cache
assertTrue(cache.getBlock(blocks[0].cacheKey, true, false) == null);
assertTrue(cache.getBlock(blocks[0].cacheKey, true, false, true) == null);
for(int i=1;i<blocks.length;i++) {
assertEquals(cache.getBlock(blocks[i].cacheKey, true, false),
assertEquals(cache.getBlock(blocks[i].cacheKey, true, false, true),
blocks[i]);
}
}
@ -208,7 +208,7 @@ public class TestLruBlockCache {
for (CachedItem block : multiBlocks) {
cache.cacheBlock(block.cacheKey, block);
expectedCacheSize += block.cacheBlockHeapSize();
assertEquals(cache.getBlock(block.cacheKey, true, false), block);
assertEquals(cache.getBlock(block.cacheKey, true, false, true), block);
}
// Add the single blocks (no get)
@ -238,14 +238,14 @@ public class TestLruBlockCache {
// This test makes multi go barely over its limit, in-memory
// empty, and the rest in single. Two single evictions and
// one multi eviction expected.
assertTrue(cache.getBlock(singleBlocks[0].cacheKey, true, false) == null);
assertTrue(cache.getBlock(multiBlocks[0].cacheKey, true, false) == null);
assertTrue(cache.getBlock(singleBlocks[0].cacheKey, true, false, true) == null);
assertTrue(cache.getBlock(multiBlocks[0].cacheKey, true, false, true) == null);
// And all others to be cached
for(int i=1;i<4;i++) {
assertEquals(cache.getBlock(singleBlocks[i].cacheKey, true, false),
assertEquals(cache.getBlock(singleBlocks[i].cacheKey, true, false, true),
singleBlocks[i]);
assertEquals(cache.getBlock(multiBlocks[i].cacheKey, true, false),
assertEquals(cache.getBlock(multiBlocks[i].cacheKey, true, false, true),
multiBlocks[i]);
}
}
@ -283,7 +283,7 @@ public class TestLruBlockCache {
// Add and get multi blocks
cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
expectedCacheSize += multiBlocks[i].cacheBlockHeapSize();
cache.getBlock(multiBlocks[i].cacheKey, true, false);
cache.getBlock(multiBlocks[i].cacheKey, true, false, true);
// Add memory blocks as such
cache.cacheBlock(memoryBlocks[i].cacheKey, memoryBlocks[i], true);
@ -305,10 +305,10 @@ public class TestLruBlockCache {
assertEquals(1, cache.getEvictedCount());
// Verify oldest single block is the one evicted
assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false, true));
// Change the oldest remaining single block to a multi
cache.getBlock(singleBlocks[1].cacheKey, true, false);
cache.getBlock(singleBlocks[1].cacheKey, true, false, true);
// Insert another single block
cache.cacheBlock(singleBlocks[4].cacheKey, singleBlocks[4]);
@ -318,7 +318,7 @@ public class TestLruBlockCache {
assertEquals(2, cache.getEvictedCount());
// Oldest multi block should be evicted now
assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false));
assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false, true));
// Insert another memory block
cache.cacheBlock(memoryBlocks[3].cacheKey, memoryBlocks[3], true);
@ -328,7 +328,7 @@ public class TestLruBlockCache {
assertEquals(3, cache.getEvictedCount());
// Oldest memory block should be evicted now
assertEquals(null, cache.getBlock(memoryBlocks[0].cacheKey, true, false));
assertEquals(null, cache.getBlock(memoryBlocks[0].cacheKey, true, false, true));
// Add a block that is twice as big (should force two evictions)
CachedItem [] bigBlocks = generateFixedBlocks(3, blockSize*3, "big");
@ -339,12 +339,12 @@ public class TestLruBlockCache {
assertEquals(6, cache.getEvictedCount());
// Expect three remaining singles to be evicted
assertEquals(null, cache.getBlock(singleBlocks[2].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[3].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[4].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[2].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(singleBlocks[3].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(singleBlocks[4].cacheKey, true, false, true));
// Make the big block a multi block
cache.getBlock(bigBlocks[0].cacheKey, true, false);
cache.getBlock(bigBlocks[0].cacheKey, true, false, true);
// Cache another single big block
cache.cacheBlock(bigBlocks[1].cacheKey, bigBlocks[1]);
@ -354,9 +354,9 @@ public class TestLruBlockCache {
assertEquals(9, cache.getEvictedCount());
// Expect three remaining multis to be evicted
assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false));
assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false));
assertEquals(null, cache.getBlock(multiBlocks[2].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(multiBlocks[2].cacheKey, true, false, true));
// Cache a big memory block
cache.cacheBlock(bigBlocks[2].cacheKey, bigBlocks[2], true);
@ -366,9 +366,9 @@ public class TestLruBlockCache {
assertEquals(12, cache.getEvictedCount());
// Expect three remaining in-memory to be evicted
assertEquals(null, cache.getBlock(memoryBlocks[1].cacheKey, true, false));
assertEquals(null, cache.getBlock(memoryBlocks[2].cacheKey, true, false));
assertEquals(null, cache.getBlock(memoryBlocks[3].cacheKey, true, false));
assertEquals(null, cache.getBlock(memoryBlocks[1].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(memoryBlocks[2].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(memoryBlocks[3].cacheKey, true, false, true));
}
@Test
@ -401,7 +401,7 @@ public class TestLruBlockCache {
// Add and get multi blocks
cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
expectedCacheSize += multiBlocks[i].cacheBlockHeapSize();
cache.getBlock(multiBlocks[i].cacheKey, true, false);
cache.getBlock(multiBlocks[i].cacheKey, true, false, true);
}
// 5th single block
cache.cacheBlock(singleBlocks[4].cacheKey, singleBlocks[4]);
@ -417,7 +417,7 @@ public class TestLruBlockCache {
assertEquals(1, cache.getEvictionCount());
assertEquals(1, cache.getEvictedCount());
// Verify oldest single block (index = 0) is the one evicted
assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false, true));
// 2. Insert another memory block, another single evicted, si:mu:me = 3:4:2
cache.cacheBlock(memoryBlocks[1].cacheKey, memoryBlocks[1], true);
@ -425,7 +425,7 @@ public class TestLruBlockCache {
assertEquals(2, cache.getEvictionCount());
assertEquals(2, cache.getEvictedCount());
// Current oldest single block (index = 1) should be evicted now
assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false, true));
// 3. Insert 4 memory blocks, 2 single and 2 multi evicted, si:mu:me = 1:2:6
cache.cacheBlock(memoryBlocks[2].cacheKey, memoryBlocks[2], true);
@ -436,10 +436,10 @@ public class TestLruBlockCache {
assertEquals(6, cache.getEvictionCount());
assertEquals(6, cache.getEvictedCount());
// two oldest single blocks and two oldest multi blocks evicted
assertEquals(null, cache.getBlock(singleBlocks[2].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[3].cacheKey, true, false));
assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false));
assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[2].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(singleBlocks[3].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false, true));
// 4. Insert 3 memory blocks, the remaining 1 single and 2 multi evicted
// si:mu:me = 0:0:9
@ -450,9 +450,9 @@ public class TestLruBlockCache {
assertEquals(9, cache.getEvictionCount());
assertEquals(9, cache.getEvictedCount());
// one oldest single block and two oldest multi blocks evicted
assertEquals(null, cache.getBlock(singleBlocks[4].cacheKey, true, false));
assertEquals(null, cache.getBlock(multiBlocks[2].cacheKey, true, false));
assertEquals(null, cache.getBlock(multiBlocks[3].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[4].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(multiBlocks[2].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(multiBlocks[3].cacheKey, true, false, true));
// 5. Insert one memory block, the oldest memory evicted
// si:mu:me = 0:0:9
@ -461,7 +461,7 @@ public class TestLruBlockCache {
assertEquals(10, cache.getEvictionCount());
assertEquals(10, cache.getEvictedCount());
// oldest memory block evicted
assertEquals(null, cache.getBlock(memoryBlocks[0].cacheKey, true, false));
assertEquals(null, cache.getBlock(memoryBlocks[0].cacheKey, true, false, true));
// 6. Insert one new single block, itself evicted immediately since
// all blocks in cache are memory-type which have higher priority
@ -471,7 +471,7 @@ public class TestLruBlockCache {
assertEquals(11, cache.getEvictionCount());
assertEquals(11, cache.getEvictedCount());
// the single block just cached now evicted (can't evict memory)
assertEquals(null, cache.getBlock(singleBlocks[9].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[9].cacheKey, true, false, true));
}
// test scan resistance
@ -498,7 +498,7 @@ public class TestLruBlockCache {
// Add 5 multi blocks
for (CachedItem block : multiBlocks) {
cache.cacheBlock(block.cacheKey, block);
cache.getBlock(block.cacheKey, true, false);
cache.getBlock(block.cacheKey, true, false, true);
}
// Add 5 single blocks
@ -513,10 +513,10 @@ public class TestLruBlockCache {
assertEquals(4, cache.getEvictedCount());
// Should have been taken off equally from single and multi
assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false));
assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false));
assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false, true));
// Let's keep "scanning" by adding single blocks. From here on we only
// expect evictions from the single bucket.
@ -568,7 +568,7 @@ public class TestLruBlockCache {
// Add and get multi blocks
cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
cache.getBlock(multiBlocks[i].cacheKey, true, false);
cache.getBlock(multiBlocks[i].cacheKey, true, false, true);
// Add memory blocks as such
cache.cacheBlock(memoryBlocks[i].cacheKey, memoryBlocks[i], true);
@ -588,16 +588,16 @@ public class TestLruBlockCache {
// And the oldest 5 blocks from each category should be gone
for(int i=0;i<5;i++) {
assertEquals(null, cache.getBlock(singleBlocks[i].cacheKey, true, false));
assertEquals(null, cache.getBlock(multiBlocks[i].cacheKey, true, false));
assertEquals(null, cache.getBlock(memoryBlocks[i].cacheKey, true, false));
assertEquals(null, cache.getBlock(singleBlocks[i].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(multiBlocks[i].cacheKey, true, false, true));
assertEquals(null, cache.getBlock(memoryBlocks[i].cacheKey, true, false, true));
}
// And the newest 5 blocks should still be accessible
for(int i=5;i<10;i++) {
assertEquals(singleBlocks[i], cache.getBlock(singleBlocks[i].cacheKey, true, false));
assertEquals(multiBlocks[i], cache.getBlock(multiBlocks[i].cacheKey, true, false));
assertEquals(memoryBlocks[i], cache.getBlock(memoryBlocks[i].cacheKey, true, false));
assertEquals(singleBlocks[i], cache.getBlock(singleBlocks[i].cacheKey, true, false, true));
assertEquals(multiBlocks[i], cache.getBlock(multiBlocks[i].cacheKey, true, false, true));
assertEquals(memoryBlocks[i], cache.getBlock(memoryBlocks[i].cacheKey, true, false, true));
}
}

View File

@ -0,0 +1,148 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestPrefetch {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
private static final int DATA_BLOCK_SIZE = 2048;
private static final int NUM_KV = 1000;
private static final Random RNG = new Random();
private Configuration conf;
private CacheConfig cacheConf;
private FileSystem fs;
@Before
public void setUp() throws IOException {
conf = TEST_UTIL.getConfiguration();
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
fs = HFileSystem.get(conf);
cacheConf = new CacheConfig(conf);
}
@Test(timeout=60000)
public void testPrefetch() throws Exception {
Path storeFile = writeStoreFile();
readStoreFile(storeFile);
}
private void readStoreFile(Path storeFilePath) throws Exception {
// Open the file
HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs,
storeFilePath, cacheConf, conf);
while (!((HFileReaderV3)reader).prefetchComplete()) {
// Sleep for a bit
Thread.sleep(1000);
}
// Check that all of the data blocks were preloaded
BlockCache blockCache = cacheConf.getBlockCache();
long offset = 0;
HFileBlock prevBlock = null;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
long onDiskSize = -1;
if (prevBlock != null) {
onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
}
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, false, true, null,
null);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
if (block.getBlockType() == BlockType.DATA ||
block.getBlockType() == BlockType.ROOT_INDEX ||
block.getBlockType() == BlockType.INTERMEDIATE_INDEX) {
assertTrue(isCached);
}
prevBlock = block;
offset += block.getOnDiskSizeWithHeader();
}
}
private Path writeStoreFile() throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestPrefetch");
HFileContext meta = new HFileContextBuilder()
.withBlockSize(DATA_BLOCK_SIZE)
.build();
StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir)
.withComparator(KeyValue.COMPARATOR)
.withFileContext(meta)
.build();
final int rowLen = 32;
for (int i = 0; i < NUM_KV; ++i) {
byte[] k = TestHFileWriterV2.randomOrderedKey(RNG, i);
byte[] v = TestHFileWriterV2.randomValue(RNG);
int cfLen = RNG.nextInt(k.length - rowLen + 1);
KeyValue kv = new KeyValue(
k, 0, rowLen,
k, rowLen, cfLen,
k, rowLen + cfLen, k.length - rowLen - cfLen,
RNG.nextLong(),
generateKeyType(RNG),
v, 0, v.length);
sfw.append(kv);
}
sfw.close();
return sfw.getPath();
}
public static KeyValue.Type generateKeyType(Random rand) {
if (rand.nextBoolean()) {
// Let's make half of KVs puts.
return KeyValue.Type.Put;
} else {
KeyValue.Type keyType =
KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum)
{
throw new RuntimeException("Generated an invalid key type: " + keyType
+ ". " + "Probably the layout of KeyValue.Type has changed.");
}
return keyType;
}
}
}

View File

@ -72,7 +72,7 @@ public class TestBucketCache {
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf,
boolean inMemory) {
if (super.getBlock(cacheKey, true, false) != null) {
if (super.getBlock(cacheKey, true, false, true) != null) {
throw new RuntimeException("Cached an already cached block");
}
super.cacheBlock(cacheKey, buf, inMemory);
@ -80,7 +80,7 @@ public class TestBucketCache {
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
if (super.getBlock(cacheKey, true, false) != null) {
if (super.getBlock(cacheKey, true, false, true) != null) {
throw new RuntimeException("Cached an already cached block");
}
super.cacheBlock(cacheKey, buf);

View File

@ -234,10 +234,10 @@ public class TestCacheOnWriteInSchema {
// Flags: don't cache the block, use pread, this is not a compaction.
// Also, pass null for expected block type to avoid checking it.
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
false, null, DataBlockEncoding.NONE);
false, true, null, DataBlockEncoding.NONE);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
offset);
boolean isCached = cache.getBlock(blockCacheKey, true, false) != null;
boolean isCached = cache.getBlock(blockCacheKey, true, false, true) != null;
boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
if (shouldBeCached != isCached) {
throw new AssertionError(

View File

@ -279,7 +279,8 @@ public class TestHeapMemoryManager {
}
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
return null;
}

View File

@ -645,6 +645,7 @@ module Hbase
family.setMinVersions(JInteger.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)
family.setKeepDeletedCells(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS)
family.setCompressTags(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESS_TAGS))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESS_TAGS)
family.setPrefetchBlocksOnOpen(JBoolean.valueOf(arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::PREFETCH_BLOCKS_ON_OPEN))) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::PREFETCH_BLOCKS_ON_OPEN)
family.setValue(COMPRESSION_COMPACT, arg.delete(COMPRESSION_COMPACT)) if arg.include?(COMPRESSION_COMPACT)
if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER)
bloomtype = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER).upcase