HBASE-3287 Add option to cache blocks on hfile write and evict blocks on hfile close

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1040762 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Gray 2010-11-30 20:27:50 +00:00
parent 70f5422d72
commit 826fbd99ff
20 changed files with 323 additions and 78 deletions

View File

@ -16,6 +16,10 @@ Release 0.91.0 - Unreleased
HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via
Andrew Purtell)
NEW FEATURES
HBASE-3287 Add option to cache blocks on hfile write and evict blocks on
hfile close
Release 0.90.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -62,7 +62,7 @@ public class HalfStoreFileReader extends StoreFile.Reader {
public HalfStoreFileReader(final FileSystem fs, final Path p, final BlockCache c,
final Reference r)
throws IOException {
super(fs, p, c, false);
super(fs, p, c, false, false);
// This is not actual midkey for this half-file; its just border
// around which we split top and bottom. Have to look in files to find
// actual last and first keys for bottom and top halves. Half-files don't

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.io.hfile;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
/**
* Block cache interface.
* TODO: Add filename or hash of filename to block cache key.
@ -49,6 +51,19 @@ public interface BlockCache {
*/
public ByteBuffer getBlock(String blockName, boolean caching);
/**
* Evict block from cache.
* @param blockName Block name to evict
* @return true if block existed and was evicted, false if not
*/
public boolean evictBlock(String blockName);
/**
* Get the statistics for this block cache.
* @return
*/
public CacheStats getStats();
/**
* Shutdown the cache.
*/

View File

@ -20,10 +20,11 @@
package org.apache.hadoop.hbase.io.hfile;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -260,6 +261,14 @@ public class HFile {
// May be null if we were passed a stream.
private Path path = null;
// Block cache to optionally fill on write
private BlockCache blockCache;
// Additional byte array output stream used to fill block cache
private ByteArrayOutputStream baos;
private DataOutputStream baosDos;
private int blockNumber = 0;
/**
* Constructor that uses all defaults for compression and block size.
* @param fs
@ -268,7 +277,8 @@ public class HFile {
*/
public Writer(FileSystem fs, Path path)
throws IOException {
this(fs, path, DEFAULT_BLOCKSIZE, (Compression.Algorithm) null, null);
this(fs, path, DEFAULT_BLOCKSIZE, (Compression.Algorithm) null, null,
null);
}
/**
@ -287,7 +297,7 @@ public class HFile {
this(fs, path, blocksize,
compress == null? DEFAULT_COMPRESSION_ALGORITHM:
Compression.getCompressionAlgorithmByName(compress),
comparator);
comparator, null);
}
/**
@ -301,12 +311,13 @@ public class HFile {
*/
public Writer(FileSystem fs, Path path, int blocksize,
Compression.Algorithm compress,
final KeyComparator comparator)
final KeyComparator comparator, BlockCache blockCache)
throws IOException {
this(fs.create(path), blocksize, compress, comparator);
this.closeOutputStream = true;
this.name = path.toString();
this.path = path;
this.blockCache = blockCache;
}
/**
@ -371,6 +382,17 @@ public class HFile {
writeTime += System.currentTimeMillis() - now;
writeOps++;
if (blockCache != null) {
baosDos.flush();
byte [] bytes = baos.toByteArray();
ByteBuffer blockToCache = ByteBuffer.wrap(bytes, DATABLOCKMAGIC.length,
bytes.length - DATABLOCKMAGIC.length);
String blockName = path.toString() + blockNumber;
blockCache.cacheBlock(blockName, blockToCache);
baosDos.close();
}
blockNumber++;
}
/*
@ -383,6 +405,11 @@ public class HFile {
this.out = getCompressingStream();
this.out.write(DATABLOCKMAGIC);
firstKey = null;
if (blockCache != null) {
this.baos = new ByteArrayOutputStream();
this.baosDos = new DataOutputStream(baos);
this.baosDos.write(DATABLOCKMAGIC);
}
}
/*
@ -552,6 +579,13 @@ public class HFile {
this.lastKeyOffset = koffset;
this.lastKeyLength = klength;
this.entryCount ++;
// If we are pre-caching blocks on write, fill byte array stream
if (blockCache != null) {
this.baosDos.writeInt(klength);
this.baosDos.writeInt(vlength);
this.baosDos.write(key, koffset, klength);
this.baosDos.write(value, voffset, vlength);
}
}
/*
@ -729,6 +763,9 @@ public class HFile {
// Whether file is from in-memory store
private boolean inMemory = false;
// Whether blocks of file should be evicted on close of file
private final boolean evictOnClose;
// Name for this object used when logging or in toString. Is either
// the result of a toString on the stream or else is toString of passed
// file Path plus metadata key/value pairs.
@ -743,9 +780,11 @@ public class HFile {
* @param cache block cache. Pass null if none.
* @throws IOException
*/
public Reader(FileSystem fs, Path path, BlockCache cache, boolean inMemory)
public Reader(FileSystem fs, Path path, BlockCache cache, boolean inMemory,
boolean evictOnClose)
throws IOException {
this(fs.open(path), fs.getFileStatus(path).getLen(), cache, inMemory);
this(path, fs.open(path), fs.getFileStatus(path).getLen(), cache,
inMemory, evictOnClose);
this.closeIStream = true;
this.name = path.toString();
}
@ -758,16 +797,20 @@ public class HFile {
* stream.
* @param size Length of the stream.
* @param cache block cache. Pass null if none.
* @param inMemory whether blocks should be marked as in-memory in cache
* @param evictOnClose whether blocks in cache should be evicted on close
* @throws IOException
*/
public Reader(final FSDataInputStream fsdis, final long size,
final BlockCache cache, final boolean inMemory) {
public Reader(Path path, final FSDataInputStream fsdis, final long size,
final BlockCache cache, final boolean inMemory,
final boolean evictOnClose) {
this.cache = cache;
this.fileSize = size;
this.istream = fsdis;
this.closeIStream = false;
this.name = this.istream == null? "": this.istream.toString();
this.name = path.toString();
this.inMemory = inMemory;
this.evictOnClose = evictOnClose;
}
@Override
@ -1192,6 +1235,14 @@ public class HFile {
}
public void close() throws IOException {
if (evictOnClose && this.cache != null) {
int numEvicted = 0;
for (int i=0; i<blockIndex.count; i++) {
if (this.cache.evictBlock(name + i)) numEvicted++;
}
LOG.debug("On close of file " + name + " evicted " + numEvicted +
" block(s) of " + blockIndex.count + " total blocks");
}
if (this.closeIStream && this.istream != null) {
this.istream.close();
this.istream = null;
@ -1898,7 +1949,7 @@ public class HFile {
continue;
}
// create reader and load file info
HFile.Reader reader = new HFile.Reader(fs, file, null, false);
HFile.Reader reader = new HFile.Reader(fs, file, null, false, false);
Map<byte[],byte[]> fileInfo = reader.loadFileInfo();
// scan over file and read key/value's and check if requested
HFileScanner scanner = reader.getScanner(false, false);

View File

@ -290,6 +290,15 @@ public class LruBlockCache implements BlockCache, HeapSize {
return cb.getBuffer();
}
@Override
public boolean evictBlock(String blockName) {
CachedBlock cb = map.get(blockName);
if (cb == null) return false;
evictBlock(cb);
return true;
}
protected long evictBlock(CachedBlock block) {
map.remove(block.getName());
size.addAndGet(-1 * block.heapSize());

View File

@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
/**
* Simple one RFile soft reference cache.
@ -83,7 +85,18 @@ public class SimpleBlockCache implements BlockCache {
cache.put(blockName, new Ref(blockName, buf, q));
}
@Override
public boolean evictBlock(String blockName) {
return cache.remove(blockName) != null;
}
public void shutdown() {
// noop
}
@Override
public CacheStats getStats() {
// TODO: implement this if we ever actually use this block cache
return null;
}
}

View File

@ -184,7 +184,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throws IOException {
final Path hfilePath = item.hfilePath;
final FileSystem fs = hfilePath.getFileSystem(getConf());
HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false);
HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false, false);
final byte[] first, last;
try {
hfr.loadFileInfo();
@ -276,7 +276,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
halfWriter = new StoreFile.Writer(
fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR,
bloomFilterType, 0);
bloomFilterType, 0, false);
HFileScanner scanner = halfReader.getScanner(false, false);
scanner.seekTo();
do {

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
@ -324,7 +325,7 @@ public class Store implements HeapSize {
LOG.info("Validating hfile at " + srcPath + " for inclusion in "
+ "store " + this + " region " + this.region);
reader = new HFile.Reader(srcPath.getFileSystem(conf),
srcPath, null, false);
srcPath, null, false, false);
reader.loadFileInfo();
byte[] firstKey = reader.getFirstRowKey();
@ -527,7 +528,8 @@ public class Store implements HeapSize {
throws IOException {
return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize,
compression, this.comparator, this.conf,
this.family.getBloomFilterType(), maxKeyCount);
this.family.getBloomFilterType(), maxKeyCount,
conf.getBoolean("hbase.rs.cacheblocksonwrite", false));
}
/*

View File

@ -19,6 +19,23 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.nio.ByteBuffer;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -47,23 +64,6 @@ import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.nio.ByteBuffer;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* A Store data file. Stores usually have one or more of these files. They
* are produced by flushing the memstore to disk. To
@ -376,7 +376,8 @@ public class StoreFile {
getBlockCache(), this.reference);
} else {
this.reader = new Reader(this.fs, this.path, getBlockCache(),
this.inMemory);
this.inMemory,
this.conf.getBoolean("hbase.rs.evictblocksonclose", true));
}
// Load up indices and fileinfo.
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
@ -529,7 +530,8 @@ public class StoreFile {
final int blocksize)
throws IOException {
return createWriter(fs, dir, blocksize, null, null, null, BloomType.NONE, 0);
return createWriter(fs, dir, blocksize, null, null, null, BloomType.NONE, 0,
false);
}
/**
@ -554,7 +556,8 @@ public class StoreFile {
final KeyValue.KVComparator c,
final Configuration conf,
BloomType bloomType,
int maxKeySize)
int maxKeySize,
final boolean cacheOnWrite)
throws IOException {
if (!fs.exists(dir)) {
@ -567,7 +570,8 @@ public class StoreFile {
return new Writer(fs, path, blocksize,
algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize);
conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize,
cacheOnWrite);
}
/**
@ -682,13 +686,17 @@ public class StoreFile {
* @param comparator key comparator
* @param bloomType bloom filter setting
* @param maxKeys maximum amount of keys to add (for blooms)
* @param cacheOnWrite whether to cache blocks as we write file
* @throws IOException problem writing to FS
*/
public Writer(FileSystem fs, Path path, int blocksize,
Compression.Algorithm compress, final Configuration conf,
final KVComparator comparator, BloomType bloomType, int maxKeys)
final KVComparator comparator, BloomType bloomType, int maxKeys,
boolean cacheOnWrite)
throws IOException {
writer = new HFile.Writer(fs, path, blocksize, compress, comparator.getRawComparator());
writer = new HFile.Writer(fs, path, blocksize, compress,
comparator.getRawComparator(),
cacheOnWrite ? StoreFile.getBlockCache(conf) : null);
this.kvComparator = comparator;
@ -894,9 +902,10 @@ public class StoreFile {
protected TimeRangeTracker timeRangeTracker = null;
protected long sequenceID = -1;
public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory)
public Reader(FileSystem fs, Path path, BlockCache blockCache,
boolean inMemory, boolean evictOnClose)
throws IOException {
reader = new HFile.Reader(fs, path, blockCache, inMemory);
reader = new HFile.Reader(fs, path, blockCache, inMemory, evictOnClose);
bloomFilterType = BloomType.NONE;
}

View File

@ -127,7 +127,7 @@ public class CompressionTest {
writer.appendFileInfo(Bytes.toBytes("infokey"), Bytes.toBytes("infoval"));
writer.close();
HFile.Reader reader = new HFile.Reader(dfs, path, null, false);
HFile.Reader reader = new HFile.Reader(dfs, path, null, false, false);
reader.loadFileInfo();
byte[] key = reader.getFirstKey();
boolean rc = Bytes.toString(key).equals("testkey");

View File

@ -188,7 +188,8 @@ public class HFilePerformanceEvaluation {
@Override
void setUp() throws Exception {
writer = new HFile.Writer(this.fs, this.mf, RFILE_BLOCKSIZE, (Compression.Algorithm) null, null);
writer = new HFile.Writer(this.fs, this.mf, RFILE_BLOCKSIZE,
(Compression.Algorithm) null, null, null);
}
@Override
@ -224,7 +225,7 @@ public class HFilePerformanceEvaluation {
@Override
void setUp() throws Exception {
reader = new HFile.Reader(this.fs, this.mf, null, false);
reader = new HFile.Reader(this.fs, this.mf, null, false, false);
this.reader.loadFileInfo();
}

View File

@ -71,7 +71,7 @@ public class TestHalfStoreFileReader {
}
w.close();
HFile.Reader r = new HFile.Reader(fs, p, null, false);
HFile.Reader r = new HFile.Reader(fs, p, null, false, false);
r.loadFileInfo();
byte [] midkey = r.midkey();
KeyValue midKV = KeyValue.createKeyValueFromKey(midkey);

View File

@ -68,7 +68,7 @@ public class RandomSeek {
long start = System.currentTimeMillis();
SimpleBlockCache cache = new SimpleBlockCache();
//LruBlockCache cache = new LruBlockCache();
Reader reader = new HFile.Reader(lfs, path, cache, false);
Reader reader = new HFile.Reader(lfs, path, cache, false, false);
reader.loadFileInfo();
System.out.println(reader.trailer);
long end = System.currentTimeMillis();

View File

@ -65,7 +65,7 @@ public class TestHFile extends HBaseTestCase {
Path f = new Path(ROOT_DIR, getName());
Writer w = new Writer(this.fs, f);
w.close();
Reader r = new Reader(fs, f, null, false);
Reader r = new Reader(fs, f, null, false, false);
r.loadFileInfo();
assertNull(r.getFirstKey());
assertNull(r.getLastKey());
@ -140,8 +140,8 @@ public class TestHFile extends HBaseTestCase {
writeRecords(writer);
fout.close();
FSDataInputStream fin = fs.open(ncTFile);
Reader reader = new Reader(fs.open(ncTFile),
fs.getFileStatus(ncTFile).getLen(), null, false);
Reader reader = new Reader(ncTFile, fs.open(ncTFile),
fs.getFileStatus(ncTFile).getLen(), null, false, false);
// Load up the index.
reader.loadFileInfo();
// Get a scanner that caches and that does not use pread.
@ -215,8 +215,8 @@ public class TestHFile extends HBaseTestCase {
writer.close();
fout.close();
FSDataInputStream fin = fs.open(mFile);
Reader reader = new Reader(fs.open(mFile), this.fs.getFileStatus(mFile)
.getLen(), null, false);
Reader reader = new Reader(mFile, fs.open(mFile),
this.fs.getFileStatus(mFile).getLen(), null, false, false);
reader.loadFileInfo();
// No data -- this should return false.
assertFalse(reader.getScanner(false, false).seekTo());
@ -240,7 +240,7 @@ public class TestHFile extends HBaseTestCase {
writer.append("foo".getBytes(), "value".getBytes());
writer.close();
fout.close();
Reader reader = new Reader(fs, mFile, null, false);
Reader reader = new Reader(fs, mFile, null, false, false);
reader.loadFileInfo();
assertNull(reader.getMetaBlock("non-existant", false));
}

View File

@ -236,8 +236,8 @@ public class TestHFilePerformance extends TestCase {
FSDataInputStream fin = fs.open(path);
if ("HFile".equals(fileType)){
HFile.Reader reader = new HFile.Reader(fs.open(path),
fs.getFileStatus(path).getLen(), null, false);
HFile.Reader reader = new HFile.Reader(path, fs.open(path),
fs.getFileStatus(path).getLen(), null, false, false);
reader.loadFileInfo();
switch (method) {

View File

@ -155,8 +155,8 @@ public class TestHFileSeek extends TestCase {
int miss = 0;
long totalBytes = 0;
FSDataInputStream fsdis = fs.open(path);
Reader reader =
new Reader(fsdis, fs.getFileStatus(path).getLen(), null, false);
Reader reader = new Reader(path, fsdis, fs.getFileStatus(path).getLen(),
null, false, false);
reader.loadFileInfo();
KeySampler kSampler =
new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(),

View File

@ -60,7 +60,7 @@ public class TestReseekTo {
fout.close();
HFile.Reader reader = new HFile.Reader(TEST_UTIL.getTestFileSystem(),
ncTFile, null, false);
ncTFile, null, false, false);
reader.loadFileInfo();
HFileScanner scanner = reader.getScanner(false, true);

View File

@ -49,7 +49,7 @@ public class TestSeekTo extends HBaseTestCase {
}
public void testSeekBefore() throws Exception {
Path p = makeNewFile();
HFile.Reader reader = new HFile.Reader(fs, p, null, false);
HFile.Reader reader = new HFile.Reader(fs, p, null, false, false);
reader.loadFileInfo();
HFileScanner scanner = reader.getScanner(false, true);
assertEquals(false, scanner.seekBefore(Bytes.toBytes("a")));
@ -82,7 +82,7 @@ public class TestSeekTo extends HBaseTestCase {
public void testSeekTo() throws Exception {
Path p = makeNewFile();
HFile.Reader reader = new HFile.Reader(fs, p, null, false);
HFile.Reader reader = new HFile.Reader(fs, p, null, false, false);
reader.loadFileInfo();
assertEquals(2, reader.blockIndex.count);
HFileScanner scanner = reader.getScanner(false, true);
@ -102,7 +102,7 @@ public class TestSeekTo extends HBaseTestCase {
public void testBlockContainingKey() throws Exception {
Path p = makeNewFile();
HFile.Reader reader = new HFile.Reader(fs, p, null, false);
HFile.Reader reader = new HFile.Reader(fs, p, null, false, false);
reader.loadFileInfo();
System.out.println(reader.blockIndex.toString());
// falls before the start of the file.

View File

@ -150,7 +150,7 @@ public class TestLoadIncrementalHFiles {
private int verifyHFile(Path p) throws IOException {
Configuration conf = util.getConfiguration();
HFile.Reader reader = new HFile.Reader(
p.getFileSystem(conf), p, null, false);
p.getFileSystem(conf), p, null, false, false);
reader.loadFileInfo();
HFileScanner scanner = reader.getScanner(false, false);
scanner.seekTo();

View File

@ -30,6 +30,7 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestCase;
@ -38,16 +39,15 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.ByteBloomFilter;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Hash;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.mockito.Mockito;
import com.google.common.base.Joiner;
import com.google.common.collect.Collections2;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -324,8 +324,8 @@ public class TestStoreFile extends HBaseTestCase {
private static String ROOT_DIR =
HBaseTestingUtility.getTestDir("TestStoreFile").toString();
private static String localFormatter = "%010d";
private void bloomWriteRead(StoreFile.Writer writer, FileSystem fs)
private void bloomWriteRead(StoreFile.Writer writer, FileSystem fs)
throws Exception {
float err = conf.getFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
Path f = writer.getPath();
@ -338,7 +338,7 @@ public class TestStoreFile extends HBaseTestCase {
}
writer.close();
StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false);
StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false, false);
reader.loadFileInfo();
reader.loadBloomfilter();
StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
@ -368,7 +368,7 @@ public class TestStoreFile extends HBaseTestCase {
if (!(falsePos <= 2* 2000 * err)) {
System.out.println("WTFBBQ! " + falsePos + ", " + (2* 2000 * err) );
}
assertTrue(falsePos <= 2* 2000 * err);
assertTrue(falsePos <= 2* 2000 * err);
}
public void testBloomFilter() throws Exception {
@ -380,7 +380,7 @@ public class TestStoreFile extends HBaseTestCase {
Path f = new Path(ROOT_DIR, getName());
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000);
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000, false);
bloomWriteRead(writer, fs);
}
@ -411,7 +411,7 @@ public class TestStoreFile extends HBaseTestCase {
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
StoreFile.DEFAULT_BLOCKSIZE_SMALL,
HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, bt[x], expKeys[x]);
conf, KeyValue.COMPARATOR, bt[x], expKeys[x], false);
long now = System.currentTimeMillis();
for (int i = 0; i < rowCount*2; i += 2) { // rows
@ -428,7 +428,7 @@ public class TestStoreFile extends HBaseTestCase {
}
writer.close();
StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false);
StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false, false);
reader.loadFileInfo();
reader.loadBloomfilter();
StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
@ -466,7 +466,7 @@ public class TestStoreFile extends HBaseTestCase {
assertTrue(falsePos < 2*expErr[x]);
}
}
public void testBloomEdgeCases() throws Exception {
float err = (float)0.005;
FileSystem fs = FileSystem.getLocal(conf);
@ -474,15 +474,15 @@ public class TestStoreFile extends HBaseTestCase {
conf.setFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, err);
conf.setBoolean(StoreFile.IO_STOREFILE_BLOOM_ENABLED, true);
conf.setInt(StoreFile.IO_STOREFILE_BLOOM_MAX_KEYS, 1000);
// this should not create a bloom because the max keys is too small
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000);
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000, false);
assertFalse(writer.hasBloom());
writer.close();
fs.delete(f, true);
conf.setInt(StoreFile.IO_STOREFILE_BLOOM_MAX_KEYS, Integer.MAX_VALUE);
// TODO: commented out because we run out of java heap space on trunk
@ -495,17 +495,18 @@ public class TestStoreFile extends HBaseTestCase {
assertTrue(writer.hasBloom());
bloomWriteRead(writer, fs);
*/
// this, however, is too large and should not create a bloom
// because Java can't create a contiguous array > MAX_INT
writer = new StoreFile.Writer(fs, f,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, Integer.MAX_VALUE);
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, Integer.MAX_VALUE,
false);
assertFalse(writer.hasBloom());
writer.close();
fs.delete(f, true);
}
public void testFlushTimeComparator() {
assertOrdering(StoreFile.Comparators.FLUSH_TIME,
mockStoreFile(true, 1000, -1, "/foo/123"),
@ -516,7 +517,7 @@ public class TestStoreFile extends HBaseTestCase {
mockStoreFile(false, -1, 5, "/foo/2"),
mockStoreFile(false, -1, 5, "/foo/3"));
}
/**
* Assert that the given comparator orders the given storefiles in the
* same way that they're passed.
@ -626,4 +627,144 @@ public class TestStoreFile extends HBaseTestCase {
//scan.setTimeRange(27, 50);
//assertTrue(!scanner.shouldSeek(scan, columns));
}
public void testCacheOnWriteEvictOnClose() throws Exception {
Configuration conf = this.conf;
conf.setBoolean("hbase.rs.evictblocksonclose", false);
// Find a home for our files
Path baseDir = new Path(new Path(this.testDir, "regionname"),
"twoCOWEOC");
// Grab the block cache and get the initial hit/miss counts
BlockCache bc = StoreFile.getBlockCache(conf);
assertNotNull(bc);
CacheStats cs = bc.getStats();
long startHit = cs.getHitCount();
long startMiss = cs.getMissCount();
long startEvicted = cs.getEvictedCount();
// Let's write a StoreFile with three blocks, with cache on write off
conf.setBoolean("hbase.rs.cacheblocksonwrite", false);
Path pathCowOff = new Path(baseDir, "123456789");
StoreFile.Writer writer = writeStoreFile(conf, pathCowOff, 3);
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
StoreFile.BloomType.NONE, false);
LOG.debug(hsf.getPath().toString());
// Read this file, we should see 3 misses
StoreFile.Reader reader = hsf.createReader();
reader.loadFileInfo();
StoreFileScanner scanner = reader.getStoreFileScanner(true, true);
scanner.seek(KeyValue.LOWESTKEY);
while (scanner.next() != null);
assertEquals(startHit, cs.getHitCount());
assertEquals(startMiss + 3, cs.getMissCount());
assertEquals(startEvicted, cs.getEvictedCount());
startMiss += 3;
scanner.close();
reader.close();
// Now write a StoreFile with three blocks, with cache on write on
conf.setBoolean("hbase.rs.cacheblocksonwrite", true);
Path pathCowOn = new Path(baseDir, "123456788");
writer = writeStoreFile(conf, pathCowOn, 3);
hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
StoreFile.BloomType.NONE, false);
// Read this file, we should see 3 hits
reader = hsf.createReader();
scanner = reader.getStoreFileScanner(true, true);
scanner.seek(KeyValue.LOWESTKEY);
while (scanner.next() != null);
assertEquals(startHit + 3, cs.getHitCount());
assertEquals(startMiss, cs.getMissCount());
assertEquals(startEvicted, cs.getEvictedCount());
startHit += 3;
scanner.close();
reader.close();
// Let's read back the two files to ensure the blocks exactly match
hsf = new StoreFile(this.fs, pathCowOff, true, conf,
StoreFile.BloomType.NONE, false);
StoreFile.Reader readerOne = hsf.createReader();
readerOne.loadFileInfo();
StoreFileScanner scannerOne = readerOne.getStoreFileScanner(true, true);
scannerOne.seek(KeyValue.LOWESTKEY);
hsf = new StoreFile(this.fs, pathCowOn, true, conf,
StoreFile.BloomType.NONE, false);
StoreFile.Reader readerTwo = hsf.createReader();
readerTwo.loadFileInfo();
StoreFileScanner scannerTwo = readerTwo.getStoreFileScanner(true, true);
scannerTwo.seek(KeyValue.LOWESTKEY);
KeyValue kv1 = null;
KeyValue kv2 = null;
while ((kv1 = scannerOne.next()) != null) {
kv2 = scannerTwo.next();
assertTrue(kv1.equals(kv2));
assertTrue(Bytes.equals(kv1.getBuffer(), kv2.getBuffer()));
}
assertNull(scannerTwo.next());
assertEquals(startHit + 6, cs.getHitCount());
assertEquals(startMiss, cs.getMissCount());
assertEquals(startEvicted, cs.getEvictedCount());
startHit += 6;
scannerOne.close();
readerOne.close();
scannerTwo.close();
readerTwo.close();
// Let's close the first file with evict on close turned on
conf.setBoolean("hbase.rs.evictblocksonclose", true);
hsf = new StoreFile(this.fs, pathCowOff, true, conf,
StoreFile.BloomType.NONE, false);
reader = hsf.createReader();
reader.close();
// We should have 3 new evictions
assertEquals(startHit, cs.getHitCount());
assertEquals(startMiss, cs.getMissCount());
assertEquals(startEvicted + 3, cs.getEvictedCount());
startEvicted += 3;
// Let's close the second file with evict on close turned off
conf.setBoolean("hbase.rs.evictblocksonclose", false);
hsf = new StoreFile(this.fs, pathCowOn, true, conf,
StoreFile.BloomType.NONE, false);
reader = hsf.createReader();
reader.close();
// We expect no changes
assertEquals(startHit, cs.getHitCount());
assertEquals(startMiss, cs.getMissCount());
assertEquals(startEvicted, cs.getEvictedCount());
}
private StoreFile.Writer writeStoreFile(Configuration conf, Path path,
int numBlocks)
throws IOException {
// Let's put ~5 small KVs in each block, so let's make 5*numBlocks KVs
int numKVs = 5 * numBlocks;
List<KeyValue> kvs = new ArrayList<KeyValue>(numKVs);
byte [] b = Bytes.toBytes("x");
int totalSize = 0;
for (int i=numKVs;i>0;i--) {
KeyValue kv = new KeyValue(b, b, b, i, b);
kvs.add(kv);
totalSize += kv.getLength();
}
int blockSize = totalSize / numBlocks;
StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize,
HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, StoreFile.BloomType.NONE, 2000,
conf.getBoolean("hbase.rs.cacheblocksonwrite", false));
// We'll write N-1 KVs to ensure we don't write an extra block
kvs.remove(kvs.size()-1);
for (KeyValue kv : kvs) {
writer.append(kv);
}
writer.appendMetadata(0, false);
writer.close();
return writer;
}
}