From 826fbd99ffa54adbaced59e788da6f7d506e9884 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Tue, 30 Nov 2010 20:27:50 +0000 Subject: [PATCH] HBASE-3287 Add option to cache blocks on hfile write and evict blocks on hfile close git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1040762 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 4 + .../hadoop/hbase/io/HalfStoreFileReader.java | 2 +- .../hadoop/hbase/io/hfile/BlockCache.java | 15 ++ .../apache/hadoop/hbase/io/hfile/HFile.java | 71 ++++++- .../hadoop/hbase/io/hfile/LruBlockCache.java | 9 + .../hbase/io/hfile/SimpleBlockCache.java | 13 ++ .../mapreduce/LoadIncrementalHFiles.java | 4 +- .../hadoop/hbase/regionserver/Store.java | 6 +- .../hadoop/hbase/regionserver/StoreFile.java | 59 +++--- .../hadoop/hbase/util/CompressionTest.java | 2 +- .../hbase/HFilePerformanceEvaluation.java | 5 +- .../hbase/io/TestHalfStoreFileReader.java | 2 +- .../hadoop/hbase/io/hfile/RandomSeek.java | 2 +- .../hadoop/hbase/io/hfile/TestHFile.java | 12 +- .../hbase/io/hfile/TestHFilePerformance.java | 4 +- .../hadoop/hbase/io/hfile/TestHFileSeek.java | 4 +- .../hadoop/hbase/io/hfile/TestReseekTo.java | 2 +- .../hadoop/hbase/io/hfile/TestSeekTo.java | 6 +- .../mapreduce/TestLoadIncrementalHFiles.java | 2 +- .../hbase/regionserver/TestStoreFile.java | 177 ++++++++++++++++-- 20 files changed, 323 insertions(+), 78 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c7b830ae47d..094431bc9b7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,10 @@ Release 0.91.0 - Unreleased HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via Andrew Purtell) + NEW FEATURES + HBASE-3287 Add option to cache blocks on hfile write and evict blocks on + hfile close + Release 0.90.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index 40be6497707..224d3d204a0 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -62,7 +62,7 @@ public class HalfStoreFileReader extends StoreFile.Reader { public HalfStoreFileReader(final FileSystem fs, final Path p, final BlockCache c, final Reference r) throws IOException { - super(fs, p, c, false); + super(fs, p, c, false, false); // This is not actual midkey for this half-file; its just border // around which we split top and bottom. Have to look in files to find // actual last and first keys for bottom and top halves. Half-files don't diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 3ef0780a935..c5c9f1d810c 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.io.hfile; import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats; + /** * Block cache interface. * TODO: Add filename or hash of filename to block cache key. @@ -49,6 +51,19 @@ public interface BlockCache { */ public ByteBuffer getBlock(String blockName, boolean caching); + /** + * Evict block from cache. + * @param blockName Block name to evict + * @return true if block existed and was evicted, false if not + */ + public boolean evictBlock(String blockName); + + /** + * Get the statistics for this block cache. + * @return + */ + public CacheStats getStats(); + /** * Shutdown the cache. */ diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index e34c334957b..b53c550513c 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -20,10 +20,11 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -260,6 +261,14 @@ public class HFile { // May be null if we were passed a stream. private Path path = null; + // Block cache to optionally fill on write + private BlockCache blockCache; + + // Additional byte array output stream used to fill block cache + private ByteArrayOutputStream baos; + private DataOutputStream baosDos; + private int blockNumber = 0; + /** * Constructor that uses all defaults for compression and block size. * @param fs @@ -268,7 +277,8 @@ public class HFile { */ public Writer(FileSystem fs, Path path) throws IOException { - this(fs, path, DEFAULT_BLOCKSIZE, (Compression.Algorithm) null, null); + this(fs, path, DEFAULT_BLOCKSIZE, (Compression.Algorithm) null, null, + null); } /** @@ -287,7 +297,7 @@ public class HFile { this(fs, path, blocksize, compress == null? DEFAULT_COMPRESSION_ALGORITHM: Compression.getCompressionAlgorithmByName(compress), - comparator); + comparator, null); } /** @@ -301,12 +311,13 @@ public class HFile { */ public Writer(FileSystem fs, Path path, int blocksize, Compression.Algorithm compress, - final KeyComparator comparator) + final KeyComparator comparator, BlockCache blockCache) throws IOException { this(fs.create(path), blocksize, compress, comparator); this.closeOutputStream = true; this.name = path.toString(); this.path = path; + this.blockCache = blockCache; } /** @@ -371,6 +382,17 @@ public class HFile { writeTime += System.currentTimeMillis() - now; writeOps++; + + if (blockCache != null) { + baosDos.flush(); + byte [] bytes = baos.toByteArray(); + ByteBuffer blockToCache = ByteBuffer.wrap(bytes, DATABLOCKMAGIC.length, + bytes.length - DATABLOCKMAGIC.length); + String blockName = path.toString() + blockNumber; + blockCache.cacheBlock(blockName, blockToCache); + baosDos.close(); + } + blockNumber++; } /* @@ -383,6 +405,11 @@ public class HFile { this.out = getCompressingStream(); this.out.write(DATABLOCKMAGIC); firstKey = null; + if (blockCache != null) { + this.baos = new ByteArrayOutputStream(); + this.baosDos = new DataOutputStream(baos); + this.baosDos.write(DATABLOCKMAGIC); + } } /* @@ -552,6 +579,13 @@ public class HFile { this.lastKeyOffset = koffset; this.lastKeyLength = klength; this.entryCount ++; + // If we are pre-caching blocks on write, fill byte array stream + if (blockCache != null) { + this.baosDos.writeInt(klength); + this.baosDos.writeInt(vlength); + this.baosDos.write(key, koffset, klength); + this.baosDos.write(value, voffset, vlength); + } } /* @@ -729,6 +763,9 @@ public class HFile { // Whether file is from in-memory store private boolean inMemory = false; + // Whether blocks of file should be evicted on close of file + private final boolean evictOnClose; + // Name for this object used when logging or in toString. Is either // the result of a toString on the stream or else is toString of passed // file Path plus metadata key/value pairs. @@ -743,9 +780,11 @@ public class HFile { * @param cache block cache. Pass null if none. * @throws IOException */ - public Reader(FileSystem fs, Path path, BlockCache cache, boolean inMemory) + public Reader(FileSystem fs, Path path, BlockCache cache, boolean inMemory, + boolean evictOnClose) throws IOException { - this(fs.open(path), fs.getFileStatus(path).getLen(), cache, inMemory); + this(path, fs.open(path), fs.getFileStatus(path).getLen(), cache, + inMemory, evictOnClose); this.closeIStream = true; this.name = path.toString(); } @@ -758,16 +797,20 @@ public class HFile { * stream. * @param size Length of the stream. * @param cache block cache. Pass null if none. + * @param inMemory whether blocks should be marked as in-memory in cache + * @param evictOnClose whether blocks in cache should be evicted on close * @throws IOException */ - public Reader(final FSDataInputStream fsdis, final long size, - final BlockCache cache, final boolean inMemory) { + public Reader(Path path, final FSDataInputStream fsdis, final long size, + final BlockCache cache, final boolean inMemory, + final boolean evictOnClose) { this.cache = cache; this.fileSize = size; this.istream = fsdis; this.closeIStream = false; - this.name = this.istream == null? "": this.istream.toString(); + this.name = path.toString(); this.inMemory = inMemory; + this.evictOnClose = evictOnClose; } @Override @@ -1192,6 +1235,14 @@ public class HFile { } public void close() throws IOException { + if (evictOnClose && this.cache != null) { + int numEvicted = 0; + for (int i=0; i fileInfo = reader.loadFileInfo(); // scan over file and read key/value's and check if requested HFileScanner scanner = reader.getScanner(false, false); diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 4ecad53540c..009ee3ca661 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -290,6 +290,15 @@ public class LruBlockCache implements BlockCache, HeapSize { return cb.getBuffer(); } + + @Override + public boolean evictBlock(String blockName) { + CachedBlock cb = map.get(blockName); + if (cb == null) return false; + evictBlock(cb); + return true; + } + protected long evictBlock(CachedBlock block) { map.remove(block.getName()); size.addAndGet(-1 * block.heapSize()); diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java index 088333f77e7..38d665ce3e4 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java @@ -25,6 +25,8 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats; + /** * Simple one RFile soft reference cache. @@ -83,7 +85,18 @@ public class SimpleBlockCache implements BlockCache { cache.put(blockName, new Ref(blockName, buf, q)); } + @Override + public boolean evictBlock(String blockName) { + return cache.remove(blockName) != null; + } + public void shutdown() { // noop } + + @Override + public CacheStats getStats() { + // TODO: implement this if we ever actually use this block cache + return null; + } } diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index e051c5826e2..88c4b2f75f9 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -184,7 +184,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throws IOException { final Path hfilePath = item.hfilePath; final FileSystem fs = hfilePath.getFileSystem(getConf()); - HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false); + HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false, false); final byte[] first, last; try { hfr.loadFileInfo(); @@ -276,7 +276,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { halfWriter = new StoreFile.Writer( fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR, - bloomFilterType, 0); + bloomFilterType, 0, false); HFileScanner scanner = halfReader.getScanner(false, false); scanner.seekTo(); do { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index aa354ffebf7..929e500cba7 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; @@ -324,7 +325,7 @@ public class Store implements HeapSize { LOG.info("Validating hfile at " + srcPath + " for inclusion in " + "store " + this + " region " + this.region); reader = new HFile.Reader(srcPath.getFileSystem(conf), - srcPath, null, false); + srcPath, null, false, false); reader.loadFileInfo(); byte[] firstKey = reader.getFirstRowKey(); @@ -527,7 +528,8 @@ public class Store implements HeapSize { throws IOException { return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize, compression, this.comparator, this.conf, - this.family.getBloomFilterType(), maxKeyCount); + this.family.getBloomFilterType(), maxKeyCount, + conf.getBoolean("hbase.rs.cacheblocksonwrite", false)); } /* diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 33e44707c13..0e170be8cbc 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -19,6 +19,23 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; +import java.nio.ByteBuffer; +import java.text.NumberFormat; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -47,23 +64,6 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryUsage; -import java.nio.ByteBuffer; -import java.text.NumberFormat; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.SortedSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - /** * A Store data file. Stores usually have one or more of these files. They * are produced by flushing the memstore to disk. To @@ -376,7 +376,8 @@ public class StoreFile { getBlockCache(), this.reference); } else { this.reader = new Reader(this.fs, this.path, getBlockCache(), - this.inMemory); + this.inMemory, + this.conf.getBoolean("hbase.rs.evictblocksonclose", true)); } // Load up indices and fileinfo. metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); @@ -529,7 +530,8 @@ public class StoreFile { final int blocksize) throws IOException { - return createWriter(fs, dir, blocksize, null, null, null, BloomType.NONE, 0); + return createWriter(fs, dir, blocksize, null, null, null, BloomType.NONE, 0, + false); } /** @@ -554,7 +556,8 @@ public class StoreFile { final KeyValue.KVComparator c, final Configuration conf, BloomType bloomType, - int maxKeySize) + int maxKeySize, + final boolean cacheOnWrite) throws IOException { if (!fs.exists(dir)) { @@ -567,7 +570,8 @@ public class StoreFile { return new Writer(fs, path, blocksize, algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm, - conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize); + conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize, + cacheOnWrite); } /** @@ -682,13 +686,17 @@ public class StoreFile { * @param comparator key comparator * @param bloomType bloom filter setting * @param maxKeys maximum amount of keys to add (for blooms) + * @param cacheOnWrite whether to cache blocks as we write file * @throws IOException problem writing to FS */ public Writer(FileSystem fs, Path path, int blocksize, Compression.Algorithm compress, final Configuration conf, - final KVComparator comparator, BloomType bloomType, int maxKeys) + final KVComparator comparator, BloomType bloomType, int maxKeys, + boolean cacheOnWrite) throws IOException { - writer = new HFile.Writer(fs, path, blocksize, compress, comparator.getRawComparator()); + writer = new HFile.Writer(fs, path, blocksize, compress, + comparator.getRawComparator(), + cacheOnWrite ? StoreFile.getBlockCache(conf) : null); this.kvComparator = comparator; @@ -894,9 +902,10 @@ public class StoreFile { protected TimeRangeTracker timeRangeTracker = null; protected long sequenceID = -1; - public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory) + public Reader(FileSystem fs, Path path, BlockCache blockCache, + boolean inMemory, boolean evictOnClose) throws IOException { - reader = new HFile.Reader(fs, path, blockCache, inMemory); + reader = new HFile.Reader(fs, path, blockCache, inMemory, evictOnClose); bloomFilterType = BloomType.NONE; } diff --git a/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java index b15f75623e5..ee241918a17 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java +++ b/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java @@ -127,7 +127,7 @@ public class CompressionTest { writer.appendFileInfo(Bytes.toBytes("infokey"), Bytes.toBytes("infoval")); writer.close(); - HFile.Reader reader = new HFile.Reader(dfs, path, null, false); + HFile.Reader reader = new HFile.Reader(dfs, path, null, false, false); reader.loadFileInfo(); byte[] key = reader.getFirstKey(); boolean rc = Bytes.toString(key).equals("testkey"); diff --git a/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java b/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java index c8de05cae41..ad29e6022de 100644 --- a/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java +++ b/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java @@ -188,7 +188,8 @@ public class HFilePerformanceEvaluation { @Override void setUp() throws Exception { - writer = new HFile.Writer(this.fs, this.mf, RFILE_BLOCKSIZE, (Compression.Algorithm) null, null); + writer = new HFile.Writer(this.fs, this.mf, RFILE_BLOCKSIZE, + (Compression.Algorithm) null, null, null); } @Override @@ -224,7 +225,7 @@ public class HFilePerformanceEvaluation { @Override void setUp() throws Exception { - reader = new HFile.Reader(this.fs, this.mf, null, false); + reader = new HFile.Reader(this.fs, this.mf, null, false, false); this.reader.loadFileInfo(); } diff --git a/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java b/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java index 47f4c391057..7cb4d9df87d 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java +++ b/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java @@ -71,7 +71,7 @@ public class TestHalfStoreFileReader { } w.close(); - HFile.Reader r = new HFile.Reader(fs, p, null, false); + HFile.Reader r = new HFile.Reader(fs, p, null, false, false); r.loadFileInfo(); byte [] midkey = r.midkey(); KeyValue midKV = KeyValue.createKeyValueFromKey(midkey); diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java index 61b20cab648..04954c75074 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomSeek.java @@ -68,7 +68,7 @@ public class RandomSeek { long start = System.currentTimeMillis(); SimpleBlockCache cache = new SimpleBlockCache(); //LruBlockCache cache = new LruBlockCache(); - Reader reader = new HFile.Reader(lfs, path, cache, false); + Reader reader = new HFile.Reader(lfs, path, cache, false, false); reader.loadFileInfo(); System.out.println(reader.trailer); long end = System.currentTimeMillis(); diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index 94aff3d165a..9d071c7788b 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -65,7 +65,7 @@ public class TestHFile extends HBaseTestCase { Path f = new Path(ROOT_DIR, getName()); Writer w = new Writer(this.fs, f); w.close(); - Reader r = new Reader(fs, f, null, false); + Reader r = new Reader(fs, f, null, false, false); r.loadFileInfo(); assertNull(r.getFirstKey()); assertNull(r.getLastKey()); @@ -140,8 +140,8 @@ public class TestHFile extends HBaseTestCase { writeRecords(writer); fout.close(); FSDataInputStream fin = fs.open(ncTFile); - Reader reader = new Reader(fs.open(ncTFile), - fs.getFileStatus(ncTFile).getLen(), null, false); + Reader reader = new Reader(ncTFile, fs.open(ncTFile), + fs.getFileStatus(ncTFile).getLen(), null, false, false); // Load up the index. reader.loadFileInfo(); // Get a scanner that caches and that does not use pread. @@ -215,8 +215,8 @@ public class TestHFile extends HBaseTestCase { writer.close(); fout.close(); FSDataInputStream fin = fs.open(mFile); - Reader reader = new Reader(fs.open(mFile), this.fs.getFileStatus(mFile) - .getLen(), null, false); + Reader reader = new Reader(mFile, fs.open(mFile), + this.fs.getFileStatus(mFile).getLen(), null, false, false); reader.loadFileInfo(); // No data -- this should return false. assertFalse(reader.getScanner(false, false).seekTo()); @@ -240,7 +240,7 @@ public class TestHFile extends HBaseTestCase { writer.append("foo".getBytes(), "value".getBytes()); writer.close(); fout.close(); - Reader reader = new Reader(fs, mFile, null, false); + Reader reader = new Reader(fs, mFile, null, false, false); reader.loadFileInfo(); assertNull(reader.getMetaBlock("non-existant", false)); } diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java index d99fc1cf691..e15efc40ed7 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java @@ -236,8 +236,8 @@ public class TestHFilePerformance extends TestCase { FSDataInputStream fin = fs.open(path); if ("HFile".equals(fileType)){ - HFile.Reader reader = new HFile.Reader(fs.open(path), - fs.getFileStatus(path).getLen(), null, false); + HFile.Reader reader = new HFile.Reader(path, fs.open(path), + fs.getFileStatus(path).getLen(), null, false, false); reader.loadFileInfo(); switch (method) { diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java index 307e6421d98..abc49174d5d 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java @@ -155,8 +155,8 @@ public class TestHFileSeek extends TestCase { int miss = 0; long totalBytes = 0; FSDataInputStream fsdis = fs.open(path); - Reader reader = - new Reader(fsdis, fs.getFileStatus(path).getLen(), null, false); + Reader reader = new Reader(path, fsdis, fs.getFileStatus(path).getLen(), + null, false, false); reader.loadFileInfo(); KeySampler kSampler = new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(), diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java index 1eb1cb66241..f01e7fd7d4f 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java @@ -60,7 +60,7 @@ public class TestReseekTo { fout.close(); HFile.Reader reader = new HFile.Reader(TEST_UTIL.getTestFileSystem(), - ncTFile, null, false); + ncTFile, null, false, false); reader.loadFileInfo(); HFileScanner scanner = reader.getScanner(false, true); diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java index d2ba71f3fb6..79445ed89b8 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java @@ -49,7 +49,7 @@ public class TestSeekTo extends HBaseTestCase { } public void testSeekBefore() throws Exception { Path p = makeNewFile(); - HFile.Reader reader = new HFile.Reader(fs, p, null, false); + HFile.Reader reader = new HFile.Reader(fs, p, null, false, false); reader.loadFileInfo(); HFileScanner scanner = reader.getScanner(false, true); assertEquals(false, scanner.seekBefore(Bytes.toBytes("a"))); @@ -82,7 +82,7 @@ public class TestSeekTo extends HBaseTestCase { public void testSeekTo() throws Exception { Path p = makeNewFile(); - HFile.Reader reader = new HFile.Reader(fs, p, null, false); + HFile.Reader reader = new HFile.Reader(fs, p, null, false, false); reader.loadFileInfo(); assertEquals(2, reader.blockIndex.count); HFileScanner scanner = reader.getScanner(false, true); @@ -102,7 +102,7 @@ public class TestSeekTo extends HBaseTestCase { public void testBlockContainingKey() throws Exception { Path p = makeNewFile(); - HFile.Reader reader = new HFile.Reader(fs, p, null, false); + HFile.Reader reader = new HFile.Reader(fs, p, null, false, false); reader.loadFileInfo(); System.out.println(reader.blockIndex.toString()); // falls before the start of the file. diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 1f0eb94ddbd..b73721a83f1 100644 --- a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -150,7 +150,7 @@ public class TestLoadIncrementalHFiles { private int verifyHFile(Path p) throws IOException { Configuration conf = util.getConfiguration(); HFile.Reader reader = new HFile.Reader( - p.getFileSystem(conf), p, null, false); + p.getFileSystem(conf), p, null, false, false); reader.loadFileInfo(); HFileScanner scanner = reader.getScanner(false, false); scanner.seekTo(); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index 0a6872b3eae..7bdf6f1206d 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -30,6 +30,7 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestCase; @@ -38,16 +39,15 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.Reference.Range; +import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.util.ByteBloomFilter; +import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Hash; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.mockito.Mockito; import com.google.common.base.Joiner; -import com.google.common.collect.Collections2; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -324,8 +324,8 @@ public class TestStoreFile extends HBaseTestCase { private static String ROOT_DIR = HBaseTestingUtility.getTestDir("TestStoreFile").toString(); private static String localFormatter = "%010d"; - - private void bloomWriteRead(StoreFile.Writer writer, FileSystem fs) + + private void bloomWriteRead(StoreFile.Writer writer, FileSystem fs) throws Exception { float err = conf.getFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, 0); Path f = writer.getPath(); @@ -338,7 +338,7 @@ public class TestStoreFile extends HBaseTestCase { } writer.close(); - StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false); + StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false, false); reader.loadFileInfo(); reader.loadBloomfilter(); StoreFileScanner scanner = reader.getStoreFileScanner(false, false); @@ -368,7 +368,7 @@ public class TestStoreFile extends HBaseTestCase { if (!(falsePos <= 2* 2000 * err)) { System.out.println("WTFBBQ! " + falsePos + ", " + (2* 2000 * err) ); } - assertTrue(falsePos <= 2* 2000 * err); + assertTrue(falsePos <= 2* 2000 * err); } public void testBloomFilter() throws Exception { @@ -380,7 +380,7 @@ public class TestStoreFile extends HBaseTestCase { Path f = new Path(ROOT_DIR, getName()); StoreFile.Writer writer = new StoreFile.Writer(fs, f, StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, - conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000); + conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000, false); bloomWriteRead(writer, fs); } @@ -411,7 +411,7 @@ public class TestStoreFile extends HBaseTestCase { StoreFile.Writer writer = new StoreFile.Writer(fs, f, StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, - conf, KeyValue.COMPARATOR, bt[x], expKeys[x]); + conf, KeyValue.COMPARATOR, bt[x], expKeys[x], false); long now = System.currentTimeMillis(); for (int i = 0; i < rowCount*2; i += 2) { // rows @@ -428,7 +428,7 @@ public class TestStoreFile extends HBaseTestCase { } writer.close(); - StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false); + StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false, false); reader.loadFileInfo(); reader.loadBloomfilter(); StoreFileScanner scanner = reader.getStoreFileScanner(false, false); @@ -466,7 +466,7 @@ public class TestStoreFile extends HBaseTestCase { assertTrue(falsePos < 2*expErr[x]); } } - + public void testBloomEdgeCases() throws Exception { float err = (float)0.005; FileSystem fs = FileSystem.getLocal(conf); @@ -474,15 +474,15 @@ public class TestStoreFile extends HBaseTestCase { conf.setFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, err); conf.setBoolean(StoreFile.IO_STOREFILE_BLOOM_ENABLED, true); conf.setInt(StoreFile.IO_STOREFILE_BLOOM_MAX_KEYS, 1000); - + // this should not create a bloom because the max keys is too small StoreFile.Writer writer = new StoreFile.Writer(fs, f, StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, - conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000); + conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000, false); assertFalse(writer.hasBloom()); writer.close(); fs.delete(f, true); - + conf.setInt(StoreFile.IO_STOREFILE_BLOOM_MAX_KEYS, Integer.MAX_VALUE); // TODO: commented out because we run out of java heap space on trunk @@ -495,17 +495,18 @@ public class TestStoreFile extends HBaseTestCase { assertTrue(writer.hasBloom()); bloomWriteRead(writer, fs); */ - + // this, however, is too large and should not create a bloom // because Java can't create a contiguous array > MAX_INT writer = new StoreFile.Writer(fs, f, StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, - conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, Integer.MAX_VALUE); + conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, Integer.MAX_VALUE, + false); assertFalse(writer.hasBloom()); writer.close(); fs.delete(f, true); } - + public void testFlushTimeComparator() { assertOrdering(StoreFile.Comparators.FLUSH_TIME, mockStoreFile(true, 1000, -1, "/foo/123"), @@ -516,7 +517,7 @@ public class TestStoreFile extends HBaseTestCase { mockStoreFile(false, -1, 5, "/foo/2"), mockStoreFile(false, -1, 5, "/foo/3")); } - + /** * Assert that the given comparator orders the given storefiles in the * same way that they're passed. @@ -626,4 +627,144 @@ public class TestStoreFile extends HBaseTestCase { //scan.setTimeRange(27, 50); //assertTrue(!scanner.shouldSeek(scan, columns)); } + + public void testCacheOnWriteEvictOnClose() throws Exception { + Configuration conf = this.conf; + conf.setBoolean("hbase.rs.evictblocksonclose", false); + + // Find a home for our files + Path baseDir = new Path(new Path(this.testDir, "regionname"), + "twoCOWEOC"); + + // Grab the block cache and get the initial hit/miss counts + BlockCache bc = StoreFile.getBlockCache(conf); + assertNotNull(bc); + CacheStats cs = bc.getStats(); + long startHit = cs.getHitCount(); + long startMiss = cs.getMissCount(); + long startEvicted = cs.getEvictedCount(); + + // Let's write a StoreFile with three blocks, with cache on write off + conf.setBoolean("hbase.rs.cacheblocksonwrite", false); + Path pathCowOff = new Path(baseDir, "123456789"); + StoreFile.Writer writer = writeStoreFile(conf, pathCowOff, 3); + StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf, + StoreFile.BloomType.NONE, false); + LOG.debug(hsf.getPath().toString()); + + // Read this file, we should see 3 misses + StoreFile.Reader reader = hsf.createReader(); + reader.loadFileInfo(); + StoreFileScanner scanner = reader.getStoreFileScanner(true, true); + scanner.seek(KeyValue.LOWESTKEY); + while (scanner.next() != null); + assertEquals(startHit, cs.getHitCount()); + assertEquals(startMiss + 3, cs.getMissCount()); + assertEquals(startEvicted, cs.getEvictedCount()); + startMiss += 3; + scanner.close(); + reader.close(); + + // Now write a StoreFile with three blocks, with cache on write on + conf.setBoolean("hbase.rs.cacheblocksonwrite", true); + Path pathCowOn = new Path(baseDir, "123456788"); + writer = writeStoreFile(conf, pathCowOn, 3); + hsf = new StoreFile(this.fs, writer.getPath(), true, conf, + StoreFile.BloomType.NONE, false); + + // Read this file, we should see 3 hits + reader = hsf.createReader(); + scanner = reader.getStoreFileScanner(true, true); + scanner.seek(KeyValue.LOWESTKEY); + while (scanner.next() != null); + assertEquals(startHit + 3, cs.getHitCount()); + assertEquals(startMiss, cs.getMissCount()); + assertEquals(startEvicted, cs.getEvictedCount()); + startHit += 3; + scanner.close(); + reader.close(); + + // Let's read back the two files to ensure the blocks exactly match + hsf = new StoreFile(this.fs, pathCowOff, true, conf, + StoreFile.BloomType.NONE, false); + StoreFile.Reader readerOne = hsf.createReader(); + readerOne.loadFileInfo(); + StoreFileScanner scannerOne = readerOne.getStoreFileScanner(true, true); + scannerOne.seek(KeyValue.LOWESTKEY); + hsf = new StoreFile(this.fs, pathCowOn, true, conf, + StoreFile.BloomType.NONE, false); + StoreFile.Reader readerTwo = hsf.createReader(); + readerTwo.loadFileInfo(); + StoreFileScanner scannerTwo = readerTwo.getStoreFileScanner(true, true); + scannerTwo.seek(KeyValue.LOWESTKEY); + KeyValue kv1 = null; + KeyValue kv2 = null; + while ((kv1 = scannerOne.next()) != null) { + kv2 = scannerTwo.next(); + assertTrue(kv1.equals(kv2)); + assertTrue(Bytes.equals(kv1.getBuffer(), kv2.getBuffer())); + } + assertNull(scannerTwo.next()); + assertEquals(startHit + 6, cs.getHitCount()); + assertEquals(startMiss, cs.getMissCount()); + assertEquals(startEvicted, cs.getEvictedCount()); + startHit += 6; + scannerOne.close(); + readerOne.close(); + scannerTwo.close(); + readerTwo.close(); + + // Let's close the first file with evict on close turned on + conf.setBoolean("hbase.rs.evictblocksonclose", true); + hsf = new StoreFile(this.fs, pathCowOff, true, conf, + StoreFile.BloomType.NONE, false); + reader = hsf.createReader(); + reader.close(); + + // We should have 3 new evictions + assertEquals(startHit, cs.getHitCount()); + assertEquals(startMiss, cs.getMissCount()); + assertEquals(startEvicted + 3, cs.getEvictedCount()); + startEvicted += 3; + + // Let's close the second file with evict on close turned off + conf.setBoolean("hbase.rs.evictblocksonclose", false); + hsf = new StoreFile(this.fs, pathCowOn, true, conf, + StoreFile.BloomType.NONE, false); + reader = hsf.createReader(); + reader.close(); + + // We expect no changes + assertEquals(startHit, cs.getHitCount()); + assertEquals(startMiss, cs.getMissCount()); + assertEquals(startEvicted, cs.getEvictedCount()); + } + + private StoreFile.Writer writeStoreFile(Configuration conf, Path path, + int numBlocks) + throws IOException { + // Let's put ~5 small KVs in each block, so let's make 5*numBlocks KVs + int numKVs = 5 * numBlocks; + List kvs = new ArrayList(numKVs); + byte [] b = Bytes.toBytes("x"); + int totalSize = 0; + for (int i=numKVs;i>0;i--) { + KeyValue kv = new KeyValue(b, b, b, i, b); + kvs.add(kv); + totalSize += kv.getLength(); + } + int blockSize = totalSize / numBlocks; + StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize, + HFile.DEFAULT_COMPRESSION_ALGORITHM, + conf, KeyValue.COMPARATOR, StoreFile.BloomType.NONE, 2000, + conf.getBoolean("hbase.rs.cacheblocksonwrite", false)); + // We'll write N-1 KVs to ensure we don't write an extra block + kvs.remove(kvs.size()-1); + for (KeyValue kv : kvs) { + writer.append(kv); + } + writer.appendMetadata(0, false); + writer.close(); + return writer; + } }