HBASE-11098 Improve documentation around our blockcache options

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1594413 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2014-05-13 22:55:59 +00:00
parent 5102668714
commit 01f6b9540b
14 changed files with 167 additions and 321 deletions

View File

@ -56,7 +56,7 @@ public final class ByteBufferArray {
this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
+ " , sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
+ bufferCount);
+ bufferCount + ", direct=" + directByteBuffer);
buffers = new ByteBuffer[bufferCount + 1];
locks = new Lock[bufferCount + 1];
for (int i = 0; i <= bufferCount; i++) {

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.io.hfile.slab.SlabCache;
import org.apache.hadoop.hbase.util.DirectMemoryUtils;
import org.apache.hadoop.util.StringUtils;
@ -76,14 +77,49 @@ public class CacheConfig {
/**
* Configuration keys for Bucket cache
*/
/**
* Current ioengine options in include: heap, offheap and file:PATH (where PATH is the path
* to the file that will host the file-based cache. See BucketCache#getIOEngineFromName() for
* list of supported ioengine options.
*
* <p>Set this option and a non-zero {@link BUCKET_CACHE_SIZE_KEY} to enable bucket cache.
*/
public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine";
/**
* When using bucket cache, this is a float that EITHER represents a percentage of total heap
* memory size to give to the cache (if < 1.0) OR, it is the capacity in megabytes of the cache.
*
* <p>The resultant size is further divided if {@link BUCKET_CACHE_COMBINED_KEY} is set (It is
* set by default. When false, bucket cache serves as an "L2" cache to the "L1"
* {@link LruBlockCache}). The percentage is set in
* with {@link BUCKET_CACHE_COMBINED_PERCENTAGE_KEY} float.
*/
public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
/**
* If the chosen ioengine can persist its state across restarts, the path to the file to
* persist to.
*/
public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY =
"hbase.bucketcache.persistent.path";
/**
* If the bucket cache is used in league with the lru on-heap block cache (meta blocks such
* as indices and blooms are kept in the lru blockcache and the data blocks in the
* bucket cache).
*/
public static final String BUCKET_CACHE_COMBINED_KEY =
"hbase.bucketcache.combinedcache.enabled";
/**
* A float which designates how much of the overall cache to give to bucket cache
* and how much to on-heap lru cache when {@link BUCKET_CACHE_COMBINED_KEY} is set.
*/
public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY =
"hbase.bucketcache.percentage.in.combinedcache";
public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
public static final String BUCKET_CACHE_WRITER_QUEUE_KEY =
"hbase.bucketcache.writer.queuelength";
@ -95,6 +131,19 @@ public class CacheConfig {
public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64;
public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f;
/**
* Setting this float to a non-null value turns on {@link DoubleBlockCache}
* which makes use of the {@link LruBlockCache} and {@link SlabCache}.
*
* The float value of between 0 and 1 will be multiplied against the setting for
* <code>-XX:MaxDirectMemorySize</code> to figure what size of the offheap allocation to give
* over to slab cache.
*
* Slab cache has been little used and is likely to be deprecated in the near future.
*/
public static final String SLAB_CACHE_OFFHEAP_PERCENTAGE_KEY =
"hbase.offheapcache.percentage";
// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@ -404,7 +453,7 @@ public class CacheConfig {
}
}
LOG.info("Allocating LruBlockCache with maximum size " +
StringUtils.humanReadableInt(lruCacheSize));
StringUtils.humanReadableInt(lruCacheSize) + ", blockSize=" + blockSize);
LruBlockCache lruCache = new LruBlockCache(lruCacheSize, blockSize);
lruCache.setVictimCache(bucketCache);
if (bucketCache != null && combinedWithLru) {

View File

@ -37,15 +37,15 @@ import org.apache.hadoop.hbase.io.HeapSize;
public interface Cacheable extends HeapSize {
/**
* Returns the length of the ByteBuffer required to serialized the object. If the
* object cannot be serialized, it should also return 0.
* object cannot be serialized, it should return 0.
*
* @return int length in bytes of the serialized form.
* @return int length in bytes of the serialized form or 0 if the object cannot be cached.
*/
int getSerializedLength();
/**
* Serializes its data into destination.
* @param destination Where to serialize to
*/
void serialize(ByteBuffer destination);
@ -60,5 +60,4 @@ public interface Cacheable extends HeapSize {
* @return the block type of this cached HFile block
*/
BlockType getBlockType();
}
}

View File

@ -30,15 +30,14 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
/**
* CombinedBlockCache is an abstraction layer that combines
* {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used
* to cache bloom blocks and index blocks , the larger bucketCache is used to
* cache data blocks. getBlock reads first from the smaller lruCache before
* looking for the block in the bucketCache. Metrics are the combined size and
* hits and misses of both caches.
* to cache bloom blocks and index blocks. The larger bucketCache is used to
* cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean) reads
* first from the smaller lruCache before looking for the block in the bucketCache.
* Metrics are the combined size and hits and misses of both caches.
*
**/
*/
@InterfaceAudience.Private
public class CombinedBlockCache implements BlockCache, HeapSize {
private final LruBlockCache lruCache;
private final BucketCache bucketCache;
private final CombinedCacheStats combinedCacheStats;
@ -73,6 +72,8 @@ public class CombinedBlockCache implements BlockCache, HeapSize {
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
boolean repeat) {
// TODO: is there a hole here, or just awkwardness since in the lruCache getBlock
// we end up calling bucketCache.getBlock.
if (lruCache.containsBlock(cacheKey)) {
return lruCache.getBlock(cacheKey, caching, repeat);
}

View File

@ -132,7 +132,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
static final int statThreadPeriod = 60 * 5;
/** Concurrent map (the cache) */
private final ConcurrentHashMap<BlockCacheKey,CachedBlock> map;
private final Map<BlockCacheKey,CachedBlock> map;
/** Eviction lock (locked when eviction in process) */
private final ReentrantLock evictionLock = new ReentrantLock(true);
@ -190,7 +190,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
/** Whether in-memory hfile's data block has higher priority when evicting */
private boolean forceInMemory;
/** Where to send victims (blocks evicted from the cache) */
/** Where to send victims (blocks evicted/missing from the cache) */
// TODO: Fix it so this is not explicit reference to a particular BlockCache implementation.
private BucketCache victimHandler = null;
/**
@ -382,7 +383,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
CachedBlock cb = map.get(cacheKey);
if(cb == null) {
if (cb == null) {
if (!repeat) stats.miss(caching);
if (victimHandler != null)
return victimHandler.getBlock(cacheKey, caching, repeat);
@ -926,5 +927,4 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
assert victimHandler == null;
victimHandler = handler;
}
}

View File

@ -1,139 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
/**
* Simple one RFile soft reference cache.
*/
@InterfaceAudience.Private
public class SimpleBlockCache implements BlockCache {
private static class Ref extends SoftReference<Cacheable> {
public BlockCacheKey blockId;
public Ref(BlockCacheKey blockId, Cacheable block, ReferenceQueue q) {
super(block, q);
this.blockId = blockId;
}
}
private Map<BlockCacheKey,Ref> cache =
new HashMap<BlockCacheKey,Ref>();
private ReferenceQueue q = new ReferenceQueue();
public int dumps = 0;
/**
* Constructor
*/
public SimpleBlockCache() {
super();
}
void processQueue() {
Ref r;
while ( (r = (Ref)q.poll()) != null) {
cache.remove(r.blockId);
dumps++;
}
}
/**
* @return the size
*/
public synchronized long size() {
processQueue();
return cache.size();
}
public synchronized Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
processQueue(); // clear out some crap.
Ref ref = cache.get(cacheKey);
if (ref == null)
return null;
return ref.get();
}
public synchronized void cacheBlock(BlockCacheKey cacheKey, Cacheable block) {
cache.put(cacheKey, new Ref(cacheKey, block, q));
}
public synchronized void cacheBlock(BlockCacheKey cacheKey, Cacheable block,
boolean inMemory) {
cache.put(cacheKey, new Ref(cacheKey, block, q));
}
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
return cache.remove(cacheKey) != null;
}
public void shutdown() {
// noop
}
@Override
public CacheStats getStats() {
// TODO: implement this if we ever actually use this block cache
return null;
}
@Override
public long getFreeSize() {
// TODO: implement this if we ever actually use this block cache
return 0;
}
@Override
public long getCurrentSize() {
// TODO: implement this if we ever actually use this block cache
return 0;
}
@Override
public long getEvictedCount() {
// TODO: implement this if we ever actually use this block cache
return 0;
}
@Override
public int evictBlocksByHfileName(String string) {
throw new UnsupportedOperationException();
}
@Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf) {
throw new UnsupportedOperationException();
}
@Override
public long getBlockCount() {
// TODO: implement this if we ever actually use this block cache
return 0;
}
}

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
/**
* This class is used to allocate a block with specified size and free the block
* when evicting. It manages an array of buckets, each bucket is associated with
* a size and caches elements up to this size. For completely empty bucket, this
* a size and caches elements up to this size. For a completely empty bucket, this
* size could be re-specified dynamically.
*
* This class is not thread safe.

View File

@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.ConcurrentIndex;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
@ -70,21 +69,21 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* BucketCache uses {@link BucketAllocator} to allocate/free block, and use
* BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
* {@link BucketCache#ramCache} and {@link BucketCache#backingMap} in order to
* determine whether a given element hit. It could uses memory
* {@link ByteBufferIOEngine} or file {@link FileIOEngine}to store/read the
* block data.
* determine if a given element is in the cache. The bucket cache can use on-heap or
* off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
* store/read the block data.
*
* Eviction is using similar algorithm as
* <p>Eviction is via a similar algorithm as used in
* {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
*
* BucketCache could be used as mainly a block cache(see
* {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS and
* fragment by GC.
* <p>BucketCache can be used as mainly a block cache (see
* {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS GC and
* heap fragmentation.
*
* Also could be used as a secondary cache(e.g. using Fusionio to store block)
* to enlarge cache space by
* <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
* blocks) to enlarge cache space via
* {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
*/
@InterfaceAudience.Private
@ -110,9 +109,9 @@ public class BucketCache implements BlockCache, HeapSize {
IOEngine ioEngine;
// Store the block in this map before writing it to cache
private ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> ramCache;
private Map<BlockCacheKey, RAMQueueEntry> ramCache;
// In this map, store the block's meta data like offset, length
private ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
private Map<BlockCacheKey, BucketEntry> backingMap;
/**
* Flag if the cache is enabled or not... We shut it off if there are IO
@ -125,8 +124,6 @@ public class BucketCache implements BlockCache, HeapSize {
new ArrayList<BlockingQueue<RAMQueueEntry>>();
WriterThread writerThreads[];
/** Volatile boolean to track if free space is in process or not */
private volatile boolean freeInProgress = false;
private Lock freeSpaceLock = new ReentrantLock();
@ -196,7 +193,7 @@ public class BucketCache implements BlockCache, HeapSize {
// Allocate or free space for the block
private BucketAllocator bucketAllocator;
public BucketCache(String ioEngineName, long capacity, int blockSize, int writerThreadNum,
int writerQLen, String persistencePath) throws FileNotFoundException,
IOException {
@ -252,7 +249,11 @@ public class BucketCache implements BlockCache, HeapSize {
// Run the statistics thread periodically to print the cache statistics log
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
LOG.info("Started bucket cache");
LOG.info("Started bucket cache; ioengine=" + ioEngineName +
", capacity=" + StringUtils.byteDesc(capacity) +
", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
persistencePath);
}
/**
@ -359,7 +360,7 @@ public class BucketCache implements BlockCache, HeapSize {
return re.getData();
}
BucketEntry bucketEntry = backingMap.get(key);
if(bucketEntry!=null) {
if (bucketEntry != null) {
long start = System.nanoTime();
IdLock.Entry lockEntry = null;
try {
@ -391,7 +392,7 @@ public class BucketCache implements BlockCache, HeapSize {
}
}
}
if(!repeat)cacheStats.miss(caching);
if(!repeat) cacheStats.miss(caching);
return null;
}
@ -430,10 +431,12 @@ public class BucketCache implements BlockCache, HeapSize {
cacheStats.evicted();
return true;
}
/*
* Statistics thread. Periodically prints the cache statistics to the log.
* Statistics thread. Periodically output cache statistics to the log.
*/
// TODO: Fix. We run a thread to log at DEBUG. If no DEBUG level, we still run the thread!
// A thread just to log is OTT. FIX.
private static class StatisticsThread extends Thread {
BucketCache bucketCache;
@ -447,7 +450,7 @@ public class BucketCache implements BlockCache, HeapSize {
bucketCache.logStats();
}
}
public void logStats() {
if (!LOG.isDebugEnabled()) return;
// Log size
@ -481,10 +484,6 @@ public class BucketCache implements BlockCache, HeapSize {
return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR);
}
private long minSize() {
return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MIN_FACTOR);
}
private long singleSize() {
return (long) Math.floor(bucketAllocator.getTotalSize()
* DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR);
@ -1193,5 +1192,4 @@ public class BucketCache implements BlockCache, HeapSize {
writerThread.join();
}
}
}
}

View File

@ -25,12 +25,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferArray;
/**
* IO engine that stores data on the memory using an array of ByteBuffers
* IO engine that stores data in memory using an array of ByteBuffers
* {@link ByteBufferArray}
*/
@InterfaceAudience.Private
public class ByteBufferIOEngine implements IOEngine {
private ByteBufferArray bufferArray;
/**
@ -88,7 +87,7 @@ public class ByteBufferIOEngine implements IOEngine {
*/
@Override
public void sync() {
// Nothing to do.
}
/**
@ -96,6 +95,6 @@ public class ByteBufferIOEngine implements IOEngine {
*/
@Override
public void shutdown() {
// Nothing to do.
}
}
}

View File

@ -53,6 +53,8 @@ public class FileIOEngine implements IOEngine {
+ StringUtils.byteDesc(fileSize), ioex);
if (raf != null) raf.close();
throw ioex;
} finally {
if (raf != null) raf.close();
}
}

View File

@ -24,12 +24,11 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A class implementing IOEngine interface could support data services for
* A class implementing IOEngine interface supports data services for
* {@link BucketCache}.
*/
@InterfaceAudience.Private
public interface IOEngine {
/**
* @return true if persistent storage is supported for the cache when shutdown
*/
@ -63,4 +62,4 @@ public interface IOEngine {
* Shutdown the IOEngine
*/
void shutdown();
}
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Provides {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache}, an implementation of
* {@link org.apache.hadoop.hbase.io.hfile.BlockCache}.
* See {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} for how it works.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Provides implementations of {@link HFile} and HFile
* {@link org.apache.hadoop.hbase.io.hfile.BlockCache}. Caches are configured (and instantiated)
* by {@link org.apache.hadoop.hbase.io.hfile.CacheConfig}. See head of the
* {@link org.apache.hadoop.hbase.io.hfile.CacheConfig} class for constants that define
* cache options and configuration keys to use setting cache options. Cache implementations
* include the on-heap {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache},
* a {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache} that can serve as an L2 for
* {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}, and a
* {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} that has a bunch of deploy types
* including L2 for LRUBlockCache or using
* {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}, as
* host for data blocks with meta blocks in the LRUBlockCache as well as onheap, offheap, and
* file options).
*
* <h1>Enabling {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache}</h2>
* {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache} has seen little use and will likely
* be deprecated in the near future. To enable it,
* set the float <code>hbase.offheapcache.percentage</code> to some value between 0 and 1. This
* enables {@link org.apache.hadoop.hbase.io.hfile.DoubleBlockCache}, a facade over
* {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} and
* {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache}. The value set here will be
* multiplied by whatever the setting for <code>-XX:MaxDirectMemorySize</code> is and this is what
* will be used by {@link org.apache.hadoop.hbase.io.hfile.slab.SlabCache} as its offheap store.
*/
package org.apache.hadoop.hbase.io.hfile;

View File

@ -1,128 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Random seek test.
*/
public class RandomSeek {
private static List<String> slurp(String fname) throws IOException {
BufferedReader istream = new BufferedReader(new FileReader(fname));
String str;
List<String> l = new ArrayList<String>();
while ( (str=istream.readLine()) != null) {
String [] parts = str.split(",");
l.add(parts[0] + ":" + parts[1] + ":" + parts[2]);
}
istream.close();
return l;
}
private static String randKey(List<String> keys) {
Random r = new Random();
//return keys.get(r.nextInt(keys.size()));
return "2" + Integer.toString(7+r.nextInt(2)) + Integer.toString(r.nextInt(100));
//return new String(r.nextInt(100));
}
public static void main(String [] argv) throws IOException {
Configuration conf = new Configuration();
conf.setInt("io.file.buffer.size", 64*1024);
RawLocalFileSystem rlfs = new RawLocalFileSystem();
rlfs.setConf(conf);
LocalFileSystem lfs = new LocalFileSystem(rlfs);
Path path = new Path("/Users/ryan/rfile.big.txt");
long start = System.currentTimeMillis();
SimpleBlockCache cache = new SimpleBlockCache();
CacheConfig cacheConf = new CacheConfig(cache, true, false, false, false,
false, false, false);
Reader reader = HFile.createReader(lfs, path, cacheConf, conf);
reader.loadFileInfo();
System.out.println(reader.getTrailer());
long end = System.currentTimeMillis();
System.out.println("Index read time: " + (end - start));
List<String> keys = slurp("/Users/ryan/xaa.50k");
// Get a scanner that doesn't cache and that uses pread.
HFileScanner scanner = reader.getScanner(false, true);
int count;
long totalBytes = 0;
int notFound = 0;
start = System.nanoTime();
for(count = 0; count < 500000; ++count) {
String key = randKey(keys);
byte [] bkey = Bytes.toBytes(key);
int res = scanner.seekTo(bkey);
if (res == 0) {
ByteBuffer k = scanner.getKey();
ByteBuffer v = scanner.getValue();
totalBytes += k.limit();
totalBytes += v.limit();
} else {
++ notFound;
}
if (res == -1) {
scanner.seekTo();
}
// Scan for another 1000 rows.
for (int i = 0; i < 1000; ++i) {
if (!scanner.next())
break;
ByteBuffer k = scanner.getKey();
ByteBuffer v = scanner.getValue();
totalBytes += k.limit();
totalBytes += v.limit();
}
if ( count % 1000 == 0 ) {
end = System.nanoTime();
System.out.println("Cache block count: " + cache.size() + " dumped: "+ cache.dumps);
//System.out.println("Cache size: " + cache.heapSize());
double msTime = ((end - start) / 1000000.0);
System.out.println("Seeked: "+ count + " in " + msTime + " (ms) "
+ (1000.0 / msTime ) + " seeks/ms "
+ (msTime / 1000.0) + " ms/seek");
start = System.nanoTime();
}
}
System.out.println("Total bytes: " + totalBytes + " not found: " + notFound);
}
}