From 35616d9205e93bd200ce32249c8320579e6145a7 Mon Sep 17 00:00:00 2001 From: Ryan Rawson Date: Mon, 28 Jun 2010 06:02:49 +0000 Subject: [PATCH] HBASE-2501 Refactor StoreFile Code git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@958468 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hadoop/hbase/io/HalfStoreFileReader.java | 10 +- .../apache/hadoop/hbase/io/hfile/HFile.java | 17 +- .../hadoop/hbase/io/hfile/HFileScanner.java | 11 - .../mapreduce/LoadIncrementalHFiles.java | 56 +- .../MinorCompactingStoreScanner.java | 2 +- .../hadoop/hbase/regionserver/Store.java | 97 ++-- .../hadoop/hbase/regionserver/StoreFile.java | 526 ++++++++++-------- .../hbase/regionserver/StoreFileScanner.java | 29 +- .../hbase/regionserver/StoreScanner.java | 4 +- .../apache/hadoop/hbase/util/BloomFilter.java | 47 +- .../hadoop/hbase/util/ByteBloomFilter.java | 142 ++--- .../hbase/util/DynamicByteBloomFilter.java | 85 +-- .../regionserver/TestFSErrorsExposed.java | 58 +- .../hbase/regionserver/TestStoreFile.java | 84 ++- 15 files changed, 583 insertions(+), 586 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index f5e1c3b6253..b3a54dfa1a0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -425,6 +425,7 @@ Release 0.21.0 - Unreleased HBASE-2787 PE is confused about flushCommits HBASE-2707 Can't recover from a dead ROOT server if any exceptions happens during log splitting + HBASE-2501 Refactor StoreFile Code IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index 6b587665993..ed12e7abb49 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.SortedSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,9 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; @@ -155,11 +152,6 @@ public class HalfStoreFileReader extends StoreFile.Reader { return this.delegate.seekBefore(key, offset, length); } - public boolean shouldSeek(byte[] row, - final SortedSet columns) { - return this.delegate.shouldSeek(row, columns); - } - public boolean seekTo() throws IOException { if (top) { int r = this.delegate.seekTo(splitkey); @@ -209,7 +201,7 @@ public class HalfStoreFileReader extends StoreFile.Reader { return delegate.seekTo(key, offset, length); } - public Reader getReader() { + public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() { return this.delegate.getReader(); } diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index ce1c2408c7c..b2de7e4f389 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -455,7 +455,7 @@ public class HFile { appendFileInfo(this.fileinfo, k, v, true); } - FileInfo appendFileInfo(FileInfo fi, final byte [] k, final byte [] v, + static FileInfo appendFileInfo(FileInfo fi, final byte [] k, final byte [] v, final boolean checkPrefix) throws IOException { if (k == null || v == null) { @@ -1078,7 +1078,7 @@ public class HFile { } return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0]; } - + /** * @return the first row key, or null if the file is empty. * TODO move this to StoreFile after Ryan's patch goes in @@ -1122,7 +1122,7 @@ public class HFile { if (lastKey == null) return null; return KeyValue.createKeyValueFromKey(lastKey).getRow(); } - + /** * @return number of K entries in this HFile's filter. Returns KV count if no filter. */ @@ -1164,6 +1164,10 @@ public class HFile { } } + public String getName() { + return name; + } + /* * Implementation of {@link HFileScanner} interface. */ @@ -1248,11 +1252,6 @@ public class HFile { return true; } - public boolean shouldSeek(final byte[] row, - final SortedSet columns) { - return true; - } - public int seekTo(byte [] key) throws IOException { return seekTo(key, 0, key.length); } @@ -1706,7 +1705,7 @@ public class HFile { super(); } } - + /** * Return true if the given file info key is reserved for internal * use by HFile. diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java index f5a5dc0ce0f..39e5600be7c 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java @@ -65,17 +65,6 @@ public interface HFileScanner { */ public boolean seekBefore(byte [] key) throws IOException; public boolean seekBefore(byte []key, int offset, int length) throws IOException; - /** - * Optimization for single key lookups. If the file has a filter, - * perform a lookup on the key. - * @param row the row to scan - * @param family the column family to scan - * @param columns the array of column qualifiers to scan - * @return False if the key definitely does not exist in this ScanFile - * @throws IOException - */ - public boolean shouldSeek(final byte[] row, - final SortedSet columns); /** * Positions this scanner at the start of the file. * @return False if empty file; i.e. a call to next would return false and diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 4212efa91e7..1183584bcd5 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -61,11 +61,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class); public static String NAME = "completebulkload"; - + public LoadIncrementalHFiles(Configuration conf) { super(conf); } - + public LoadIncrementalHFiles() { super(); } @@ -73,7 +73,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private void usage() { System.err.println("usage: " + NAME + - " /path/to/hfileoutputformat-output " + + " /path/to/hfileoutputformat-output " + "tablename"); } @@ -88,7 +88,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private static class LoadQueueItem { final byte[] family; final Path hfilePath; - + public LoadQueueItem(byte[] family, Path hfilePath) { this.family = family; this.hfilePath = hfilePath; @@ -102,17 +102,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private Deque discoverLoadQueue(Path hfofDir) throws IOException { FileSystem fs = hfofDir.getFileSystem(getConf()); - + if (!fs.exists(hfofDir)) { throw new FileNotFoundException("HFileOutputFormat dir " + - hfofDir + " not found"); + hfofDir + " not found"); } - + FileStatus[] familyDirStatuses = fs.listStatus(hfofDir); if (familyDirStatuses == null) { throw new FileNotFoundException("No families found in " + hfofDir); } - + Deque ret = new LinkedList(); for (FileStatus stat : familyDirStatuses) { if (!stat.isDir()) { @@ -144,13 +144,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throws TableNotFoundException, IOException { HConnection conn = table.getConnection(); - + if (!conn.isTableAvailable(table.getTableName())) { - throw new TableNotFoundException("Table " + + throw new TableNotFoundException("Table " + Bytes.toStringBinary(table.getTableName()) + "is not currently available."); } - + Deque queue = null; try { queue = discoverLoadQueue(hfofDir); @@ -193,7 +193,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } finally { hfr.close(); } - + LOG.info("Trying to load hfile=" + hfilePath + " first=" + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); @@ -202,7 +202,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.info("hfile " + hfilePath + " has no entries, skipping"); return; } - + // We use a '_' prefix which is ignored when walking directory trees // above. final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp"); @@ -217,8 +217,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { if (!hri.containsRange(first, last)) { LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); - - HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family); + + HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family); Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom"); Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top"); splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(), @@ -231,14 +231,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); return null; } - + byte[] regionName = location.getRegionInfo().getRegionName(); server.bulkLoadHFile(hfilePath.toString(), regionName, item.family); return null; } }); } - + /** * Split a storefile into a top and bottom half, maintaining * the metadata, recreating bloom filters, etc. @@ -251,11 +251,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // Open reader with no block cache, and not in-memory Reference topReference = new Reference(splitKey, Range.top); Reference bottomReference = new Reference(splitKey, Range.bottom); - + copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); } - + /** * Copy half of an HFile into a new HFile. */ @@ -265,15 +265,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throws IOException { FileSystem fs = inFile.getFileSystem(conf); HalfStoreFileReader halfReader = null; - HFile.Writer halfWriter = null; + StoreFile.Writer halfWriter = null; try { halfReader = new HalfStoreFileReader(fs, inFile, null, reference); Map fileInfo = halfReader.loadFileInfo(); - + int blocksize = familyDescriptor.getBlocksize(); Algorithm compression = familyDescriptor.getCompression(); BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); - + halfWriter = new StoreFile.Writer( fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR, bloomFilterType, 0); @@ -283,7 +283,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { KeyValue kv = scanner.getKeyValue(); halfWriter.append(kv); } while (scanner.next()); - + for (Map.Entry entry : fileInfo.entrySet()) { if (shouldCopyHFileMetaKey(entry.getKey())) { halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); @@ -292,9 +292,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } finally { if (halfWriter != null) halfWriter.close(); if (halfReader != null) halfReader.close(); - } + } } - + private static boolean shouldCopyHFileMetaKey(byte[] key) { return !HFile.isReservedFileInfoKey(key); } @@ -306,10 +306,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { usage(); return -1; } - + Path hfofDir = new Path(args[0]); HTable table = new HTable(args[1]); - + doBulkLoad(hfofDir, table); return 0; } @@ -317,5 +317,5 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new LoadIncrementalHFiles(), args); } - + } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java index 71f738e66ab..ddf5be07168 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java @@ -79,7 +79,7 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca * @return True if more. * @throws IOException */ - public boolean next(HFile.Writer writer) throws IOException { + public boolean next(StoreFile.Writer writer) throws IOException { KeyValue row = heap.peek(); if (row == null) { close(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 5a2c1536083..9c720b141f6 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -59,14 +58,24 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; /** - * A Store holds a column family in a Region. Its a memstore and a set of zero - * or more StoreFiles, which stretch backwards over time. - * - *

There's no reason to consider append-logging at this level; all logging - * and locking is handled at the HRegion level. Store just provides - * services to manage sets of StoreFiles. One of the most important of those - * services is compaction services where files are aggregated once they pass - * a configurable threshold. + * A Store holds a column family in a Region. Its a memstore and a set of zero + * or more StoreFiles, which stretch backwards over time. + * + *

There's no reason to consider append-logging at this level; all logging + * and locking is handled at the HRegion level. Store just provides + * services to manage sets of StoreFiles. One of the most important of those + * services is compaction services where files are aggregated once they pass + * a configurable threshold. + * + *

The only thing having to do with logs that Store needs to deal with is + * the reconstructionLog. This is a segment of an HRegion's log that might + * NOT be present upon startup. If the param is NULL, there's nothing to do. + * If the param is non-NULL, we need to process the log to reconstruct + * a TreeMap that might not have been written to disk before the process + * died. + * + *

It's assumed that after this constructor returns, the reconstructionLog + * file will be deleted (by whoever has instantiated the Store). * *

Locking and transactions are handled at a higher level. This API should * not be called directly but by an HRegion manager. @@ -303,7 +312,10 @@ public class Store implements HeapSize { reader.loadFileInfo(); byte[] firstKey = reader.getFirstRowKey(); - byte[] lastKey = reader.getLastRowKey(); + byte[] lk = reader.getLastKey(); + byte[] lastKey = + (lk == null) ? null : + KeyValue.createKeyValueFromKey(lk).getRow(); LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) + " last=" + Bytes.toStringBinary(lastKey)); @@ -423,8 +435,8 @@ public class Store implements HeapSize { * @throws IOException */ private StoreFile internalFlushCache(final SortedSet set, - final long logCacheFlushId) - throws IOException { + final long logCacheFlushId) + throws IOException { StoreFile.Writer writer = null; long flushed = 0; // Don't flush if there are no entries. @@ -462,7 +474,7 @@ public class Store implements HeapSize { StoreFile sf = new StoreFile(this.fs, dstPath, blockcache, this.conf, this.family.getBloomFilterType(), this.inMemory); - Reader r = sf.createReader(); + StoreFile.Reader r = sf.createReader(); this.storeSize += r.length(); if(LOG.isInfoEnabled()) { LOG.info("Added " + sf + ", entries=" + r.getEntries() + @@ -601,7 +613,7 @@ public class Store implements HeapSize { LOG.warn("Path is null for " + file); return null; } - Reader r = file.getReader(); + StoreFile.Reader r = file.getReader(); if (r == null) { LOG.warn("StoreFile " + file + " has a null Reader"); return null; @@ -653,7 +665,7 @@ public class Store implements HeapSize { this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() + (references? ", hasReferences=true,": " ") + " into " + region.getTmpDir() + ", seqid=" + maxId); - HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); + StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, writer); if (LOG.isInfoEnabled()) { @@ -689,8 +701,7 @@ public class Store implements HeapSize { * @param dir * @throws IOException */ - private static long getLowestTimestamp(FileSystem fs, Path dir) - throws IOException { + private static long getLowestTimestamp(FileSystem fs, Path dir) throws IOException { FileStatus[] stats = fs.listStatus(dir); if (stats == null || stats.length == 0) { return 0l; @@ -716,8 +727,7 @@ public class Store implements HeapSize { * @param filesToCompact Files to compact. Can be null. * @return True if we should run a major compaction. */ - private boolean isMajorCompaction(final List filesToCompact) - throws IOException { + private boolean isMajorCompaction(final List filesToCompact) throws IOException { boolean result = false; if (filesToCompact == null || filesToCompact.isEmpty() || majorCompactionTime == 0) { @@ -758,9 +768,9 @@ public class Store implements HeapSize { * nothing made it through the compaction. * @throws IOException */ - private HFile.Writer compact(final List filesToCompact, - final boolean majorCompaction, final long maxId) - throws IOException { + private StoreFile.Writer compact(final List filesToCompact, + final boolean majorCompaction, final long maxId) + throws IOException { // calculate maximum key count after compaction (for blooms) int maxKeyCount = 0; for (StoreFile file : filesToCompact) { @@ -850,8 +860,8 @@ public class Store implements HeapSize { * @throws IOException */ private StoreFile completeCompaction(final List compactedFiles, - final HFile.Writer compactedFile) - throws IOException { + final StoreFile.Writer compactedFile) + throws IOException { // 1. Moving the new files into place -- if there is a new file (may not // be if all cells were expired or deleted). StoreFile result = null; @@ -907,7 +917,7 @@ public class Store implements HeapSize { // 4. Compute new store size this.storeSize = 0L; for (StoreFile hsf : this.storefiles) { - Reader r = hsf.getReader(); + StoreFile.Reader r = hsf.getReader(); if (r == null) { LOG.warn("StoreFile " + hsf + " has a null Reader"); continue; @@ -970,8 +980,7 @@ public class Store implements HeapSize { * @return Found keyvalue or null if none found. * @throws IOException */ - KeyValue getRowKeyAtOrBefore(final KeyValue kv) - throws IOException { + KeyValue getRowKeyAtOrBefore(final KeyValue kv) throws IOException { GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker( this.comparator, kv, this.ttl, this.region.getRegionInfo().isMetaRegion()); this.lock.readLock().lock(); @@ -997,9 +1006,9 @@ public class Store implements HeapSize { * @throws IOException */ private void rowAtOrBeforeFromStoreFile(final StoreFile f, - final GetClosestRowBeforeTracker state) - throws IOException { - Reader r = f.getReader(); + final GetClosestRowBeforeTracker state) + throws IOException { + StoreFile.Reader r = f.getReader(); if (r == null) { LOG.warn("StoreFile " + f + " has a null Reader"); return; @@ -1049,8 +1058,9 @@ public class Store implements HeapSize { * @throws IOException */ private boolean seekToScanner(final HFileScanner scanner, - final KeyValue firstOnRow, final KeyValue firstKV) - throws IOException { + final KeyValue firstOnRow, + final KeyValue firstKV) + throws IOException { KeyValue kv = firstOnRow; // If firstOnRow < firstKV, set to firstKV if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV; @@ -1070,8 +1080,9 @@ public class Store implements HeapSize { * @throws IOException */ private boolean walkForwardInSingleRow(final HFileScanner scanner, - final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) - throws IOException { + final KeyValue firstOnRow, + final GetClosestRowBeforeTracker state) + throws IOException { boolean foundCandidate = false; do { KeyValue kv = scanner.getKeyValue(); @@ -1129,7 +1140,7 @@ public class Store implements HeapSize { return null; } } - Reader r = sf.getReader(); + StoreFile.Reader r = sf.getReader(); if (r == null) { LOG.warn("Storefile " + sf + " Reader is null"); continue; @@ -1141,7 +1152,7 @@ public class Store implements HeapSize { largestSf = sf; } } - HFile.Reader r = largestSf.getReader(); + StoreFile.Reader r = largestSf.getReader(); if (r == null) { LOG.warn("Storefile " + largestSf + " Reader is null"); return null; @@ -1217,7 +1228,7 @@ public class Store implements HeapSize { long getStorefilesSize() { long size = 0; for (StoreFile s: storefiles) { - Reader r = s.getReader(); + StoreFile.Reader r = s.getReader(); if (r == null) { LOG.warn("StoreFile " + s + " has a null Reader"); continue; @@ -1233,7 +1244,7 @@ public class Store implements HeapSize { long getStorefilesIndexSize() { long size = 0; for (StoreFile s: storefiles) { - Reader r = s.getReader(); + StoreFile.Reader r = s.getReader(); if (r == null) { LOG.warn("StoreFile " + s + " has a null Reader"); continue; @@ -1284,7 +1295,7 @@ public class Store implements HeapSize { * @throws IOException */ static boolean getClosest(final HFileScanner s, final KeyValue kv) - throws IOException { + throws IOException { // Pass offsets to key content of a KeyValue; thats whats in the hfile index. int result = s.seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength()); if (result < 0) { @@ -1311,7 +1322,7 @@ public class Store implements HeapSize { * @throws IOException */ public void get(Get get, NavigableSet columns, List result) - throws IOException { + throws IOException { KeyComparator keyComparator = this.comparator.getRawComparator(); // Column matching and version enforcement @@ -1333,7 +1344,7 @@ public class Store implements HeapSize { // Get storefiles for this store List storefileScanners = new ArrayList(); for (StoreFile sf : Iterables.reverse(this.storefiles)) { - HFile.Reader r = sf.getReader(); + StoreFile.Reader r = sf.getReader(); if (r == null) { LOG.warn("StoreFile " + sf + " has a null Reader"); continue; @@ -1367,8 +1378,8 @@ public class Store implements HeapSize { * @throws IOException */ public long updateColumnValue(byte [] row, byte [] f, - byte [] qualifier, long newValue) - throws IOException { + byte [] qualifier, long newValue) + throws IOException { List result = new ArrayList(); KeyComparator keyComparator = this.comparator.getRawComparator(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 62135dcf079..674726e2fe0 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.ByteBloomFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Hash; +import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Function; @@ -56,8 +56,8 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.SortedSet; import java.util.Random; +import java.util.SortedSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -71,18 +71,53 @@ import java.util.regex.Pattern; * sitting in the Filesystem. To refer to it, create a StoreFile instance * passing filesystem and path. To read, call {@link #createReader()}. *

StoreFiles may also reference store files in another Store. + * + * The reason for this weird pattern where you use a different instance for the + * writer and a reader is that we write once but read a lot more. */ public class StoreFile { static final Log LOG = LogFactory.getLog(StoreFile.class.getName()); - private static final String HFILE_CACHE_SIZE_KEY = "hfile.block.cache.size"; + // Config keys. + static final String IO_STOREFILE_BLOOM_ERROR_RATE = "io.storefile.bloom.error.rate"; + static final String IO_STOREFILE_BLOOM_MAX_FOLD = "io.storefile.bloom.max.fold"; + static final String IO_STOREFILE_BLOOM_ENABLED = "io.storefile.bloom.enabled"; + static final String HFILE_BLOCK_CACHE_SIZE_KEY = "hfile.block.cache.size"; - private static BlockCache hfileBlockCache = null; + public static enum BloomType { + /** + * Bloomfilters disabled + */ + NONE, + /** + * Bloom enabled with Table row as Key + */ + ROW, + /** + * Bloom enabled with Table row & column (family+qualifier) as Key + */ + ROWCOL + } + // Keys for fileinfo values in HFile + /** Max Sequence ID in FileInfo */ + public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); + /** Major compaction flag in FileInfo */ + public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); + /** Bloom filter Type in FileInfo */ + static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); + + /** Meta data block name for bloom filter meta-info (ie: bloom params/specs) */ + static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META"; + /** Meta data block name for bloom filter data (ie: bloom bits) */ + static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA"; // Make default block size for StoreFiles 8k while testing. TODO: FIX! // Need to make it 8k for testing. public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024; + + private static BlockCache hfileBlockCache = null; + private final FileSystem fs; // This file's path. private final Path path; @@ -96,15 +131,9 @@ public class StoreFile { private boolean inMemory; // Keys for metadata stored in backing HFile. - /** Constant for the max sequence ID meta */ - public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); // Set when we obtain a Reader. private long sequenceid = -1; - /** Constant for major compaction meta */ - public static final byte [] MAJOR_COMPACTION_KEY = - Bytes.toBytes("MAJOR_COMPACTION_KEY"); - // If true, this file was product of a major compaction. Its then set // whenever you get a Reader. private AtomicBoolean majorCompaction = null; @@ -115,12 +144,6 @@ public class StoreFile { public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP"); - - static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META"; - static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA"; - static final byte[] BLOOM_FILTER_TYPE_KEY = - Bytes.toBytes("BLOOM_FILTER_TYPE"); - /** * Map of the metadata entries in the corresponding HFile */ @@ -134,7 +157,8 @@ public class StoreFile { private static final Pattern REF_NAME_PARSER = Pattern.compile("^(\\d+)(?:\\.(.+))?$"); - private volatile StoreFile.Reader reader; + // StoreFile.Reader + private volatile Reader reader; // Used making file ids. private final static Random rand = new Random(); @@ -153,9 +177,13 @@ public class StoreFile { * @param bt The bloom type to use for this store file * @throws IOException When opening the reader fails. */ - StoreFile(final FileSystem fs, final Path p, final boolean blockcache, - final Configuration conf, final BloomType bt, final boolean inMemory) - throws IOException { + StoreFile(final FileSystem fs, + final Path p, + final boolean blockcache, + final Configuration conf, + final BloomType bt, + final boolean inMemory) + throws IOException { this.conf = conf; this.fs = fs; this.path = p; @@ -167,7 +195,7 @@ public class StoreFile { } // ignore if the column family config says "no bloom filter" // even if there is one in the hfile. - if (conf.getBoolean("io.hfile.bloom.enabled", true)) { + if (conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true)) { this.bloomType = bt; } else { this.bloomType = BloomType.NONE; @@ -307,11 +335,11 @@ public class StoreFile { public static synchronized BlockCache getBlockCache(Configuration conf) { if (hfileBlockCache != null) return hfileBlockCache; - float cachePercentage = conf.getFloat(HFILE_CACHE_SIZE_KEY, 0.0f); + float cachePercentage = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); // There should be a better way to optimize this. But oh well. if (cachePercentage == 0L) return null; if (cachePercentage > 1.0) { - throw new IllegalArgumentException(HFILE_CACHE_SIZE_KEY + + throw new IllegalArgumentException(HFILE_BLOCK_CACHE_SIZE_KEY + " must be between 0.0 and 1.0, not > 1.0"); } @@ -337,18 +365,20 @@ public class StoreFile { * @throws IOException * @see #closeReader() */ - private StoreFile.Reader open() - throws IOException { + private Reader open() throws IOException { + if (this.reader != null) { throw new IllegalAccessError("Already open"); } + if (isReference()) { this.reader = new HalfStoreFileReader(this.fs, this.referencePath, getBlockCache(), this.reference); } else { - this.reader = new StoreFile.Reader(this.fs, this.path, getBlockCache(), + this.reader = new Reader(this.fs, this.path, getBlockCache(), this.inMemory); } + // Load up indices and fileinfo. metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); // Read in our metadata. @@ -365,8 +395,8 @@ public class StoreFile { this.sequenceid += 1; } } - } + b = metadataMap.get(MAJOR_COMPACTION_KEY); if (b != null) { boolean mc = Bytes.toBoolean(b); @@ -388,7 +418,7 @@ public class StoreFile { * @return Reader for StoreFile. creates if necessary * @throws IOException */ - public StoreFile.Reader createReader() throws IOException { + public Reader createReader() throws IOException { if (this.reader == null) { this.reader = open(); } @@ -400,7 +430,7 @@ public class StoreFile { * @throws IOException * @see {@link #createReader()} */ - public StoreFile.Reader getReader() { + public Reader getReader() { return this.reader; } @@ -455,9 +485,11 @@ public class StoreFile { * @return True if succeeded. * @throws IOException */ - public static Path rename(final FileSystem fs, final Path src, - final Path tgt) - throws IOException { + public static Path rename(final FileSystem fs, + final Path src, + final Path tgt) + throws IOException { + if (!fs.exists(src)) { throw new FileNotFoundException(src.toString()); } @@ -469,19 +501,20 @@ public class StoreFile { /** * Get a store file writer. Client is responsible for closing file when done. - * If metadata, add BEFORE closing using - * {@link #appendMetadata(org.apache.hadoop.hbase.io.hfile.HFile.Writer, long)}. + * * @param fs * @param dir Path to family directory. Makes the directory if doesn't exist. * Creates a file with a unique name in this directory. * @param blocksize size per filesystem block - * @return HFile.Writer + * @return StoreFile.Writer * @throws IOException */ - public static StoreFile.Writer createWriter(final FileSystem fs, final Path dir, - final int blocksize) - throws IOException { - return createWriter(fs,dir,blocksize,null,null,null,BloomType.NONE,0); + public static Writer createWriter(final FileSystem fs, + final Path dir, + final int blocksize) + throws IOException { + + return createWriter(fs, dir, blocksize, null, null, null, BloomType.NONE, 0); } /** @@ -499,20 +532,25 @@ public class StoreFile { * @return HFile.Writer * @throws IOException */ - public static StoreFile.Writer createWriter(final FileSystem fs, final Path dir, - final int blocksize, final Compression.Algorithm algorithm, - final KeyValue.KVComparator c, final Configuration conf, - BloomType bloomType, int maxKeySize) - throws IOException { + public static StoreFile.Writer createWriter(final FileSystem fs, + final Path dir, + final int blocksize, + final Compression.Algorithm algorithm, + final KeyValue.KVComparator c, + final Configuration conf, + BloomType bloomType, + int maxKeySize) + throws IOException { + if (!fs.exists(dir)) { fs.mkdirs(dir); } Path path = getUniqueFile(fs, dir); - if(conf == null || !conf.getBoolean("io.hfile.bloom.enabled", true)) { + if(conf == null || !conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true)) { bloomType = BloomType.NONE; } - return new StoreFile.Writer(fs, path, blocksize, + return new Writer(fs, path, blocksize, algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm, conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize); } @@ -523,7 +561,7 @@ public class StoreFile { * @return random filename inside passed dir */ public static Path getUniqueFile(final FileSystem fs, final Path dir) - throws IOException { + throws IOException { if (!fs.getFileStatus(dir).isDir()) { throw new IOException("Expecting " + dir.toString() + " to be a directory"); @@ -539,7 +577,7 @@ public class StoreFile { * @throws IOException */ static Path getRandomFilename(final FileSystem fs, final Path dir) - throws IOException { + throws IOException { return getRandomFilename(fs, dir, null); } @@ -551,9 +589,10 @@ public class StoreFile { * @return Path to a file that doesn't exist at time of this invocation. * @throws IOException */ - static Path getRandomFilename(final FileSystem fs, final Path dir, - final String suffix) - throws IOException { + static Path getRandomFilename(final FileSystem fs, + final Path dir, + final String suffix) + throws IOException { long id = -1; Path p = null; do { @@ -564,8 +603,11 @@ public class StoreFile { return p; } - /* + /** * Write out a split reference. + * + * Package local so it doesnt leak out of regionserver. + * * @param fs * @param splitDir Presumes path format is actually * SOME_DIRECTORY/REGIONNAME/FAMILY. @@ -575,9 +617,12 @@ public class StoreFile { * @return Path to created reference. * @throws IOException */ - static Path split(final FileSystem fs, final Path splitDir, - final StoreFile f, final byte [] splitRow, final Reference.Range range) - throws IOException { + static Path split(final FileSystem fs, + final Path splitDir, + final StoreFile f, + final byte [] splitRow, + final Reference.Range range) + throws IOException { // A reference to the bottom half of the hsf store file. Reference r = new Reference(splitRow, range); // Add the referred-to regions name as a dot separated suffix. @@ -591,164 +636,19 @@ public class StoreFile { return r.write(fs, p); } - public static enum BloomType { - /** - * Bloomfilters disabled - */ - NONE, - /** - * Bloom enabled with Table row as Key - */ - ROW, - /** - * Bloom enabled with Table row & column (family+qualifier) as Key - */ - ROWCOL - } /** - * + * A StoreFile writer. Use this to read/write HBase Store Files. It is package + * local because it is an implementation detail of the HBase regionserver. */ - public static class Reader extends HFile.Reader { - /** Bloom Filter class. Caches only meta, pass in data */ - protected BloomFilter bloomFilter = null; - /** Type of bloom filter (e.g. ROW vs ROWCOL) */ - protected BloomType bloomFilterType; - - public Reader(FileSystem fs, Path path, BlockCache cache, - boolean inMemory) - throws IOException { - super(fs, path, cache, inMemory); - } - - public Reader(final FSDataInputStream fsdis, final long size, - final BlockCache cache, final boolean inMemory) { - super(fsdis,size,cache,inMemory); - bloomFilterType = BloomType.NONE; - } - - @Override - public Map loadFileInfo() - throws IOException { - Map fi = super.loadFileInfo(); - - byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY); - if (b != null) { - bloomFilterType = BloomType.valueOf(Bytes.toString(b)); - } - - return fi; - } - - /** - * Load the bloom filter for this HFile into memory. - * Assumes the HFile has already been loaded - */ - public void loadBloomfilter() { - if (this.bloomFilter != null) { - return; // already loaded - } - - // see if bloom filter information is in the metadata - try { - ByteBuffer b = getMetaBlock(BLOOM_FILTER_META_KEY, false); - if (b != null) { - if (bloomFilterType == BloomType.NONE) { - throw new IOException("valid bloom filter type not found in FileInfo"); - } - this.bloomFilter = new ByteBloomFilter(b); - LOG.info("Loaded " + (bloomFilterType==BloomType.ROW? "row":"col") - + " bloom filter metadata for " + name); - } - } catch (IOException e) { - LOG.error("Error reading bloom filter meta -- proceeding without", e); - this.bloomFilter = null; - } catch (IllegalArgumentException e) { - LOG.error("Bad bloom filter meta -- proceeding without", e); - this.bloomFilter = null; - } - } - - BloomFilter getBloomFilter() { - return this.bloomFilter; - } - - /** - * @return bloom type information associated with this store file - */ - public BloomType getBloomFilterType() { - return this.bloomFilterType; - } - - @Override - public int getFilterEntries() { - return (this.bloomFilter != null) ? this.bloomFilter.getKeyCount() - : super.getFilterEntries(); - } - - @Override - public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) { - return new Scanner(this, cacheBlocks, pread); - } - - protected class Scanner extends HFile.Reader.Scanner { - public Scanner(Reader r, boolean cacheBlocks, final boolean pread) { - super(r, cacheBlocks, pread); - } - - @Override - public boolean shouldSeek(final byte[] row, - final SortedSet columns) { - if (bloomFilter == null) { - return true; - } - - byte[] key; - switch(bloomFilterType) { - case ROW: - key = row; - break; - case ROWCOL: - if (columns.size() == 1) { - byte[] col = columns.first(); - key = Bytes.add(row, col); - break; - } - //$FALL-THROUGH$ - default: - return true; - } - - try { - ByteBuffer bloom = getMetaBlock(BLOOM_FILTER_DATA_KEY, true); - if (bloom != null) { - return bloomFilter.contains(key, bloom); - } - } catch (IOException e) { - LOG.error("Error reading bloom filter data -- proceeding without", - e); - bloomFilter = null; - } catch (IllegalArgumentException e) { - LOG.error("Bad bloom filter data -- proceeding without", e); - bloomFilter = null; - } - - return true; - } - - } - } - - /** - * - */ - public static class Writer extends HFile.Writer { + public static class Writer { private final BloomFilter bloomFilter; private final BloomType bloomType; private KVComparator kvComparator; private KeyValue lastKv = null; private byte[] lastByteArray = null; + protected HFile.Writer writer; /** * Creates an HFile.Writer that also write helpful meta data. * @param fs file system to write to @@ -764,14 +664,14 @@ public class StoreFile { public Writer(FileSystem fs, Path path, int blocksize, Compression.Algorithm compress, final Configuration conf, final KVComparator comparator, BloomType bloomType, int maxKeys) - throws IOException { - super(fs, path, blocksize, compress, comparator.getRawComparator()); + throws IOException { + writer = new HFile.Writer(fs, path, blocksize, compress, comparator.getRawComparator()); this.kvComparator = comparator; if (bloomType != BloomType.NONE && conf != null) { - float err = conf.getFloat("io.hfile.bloom.error.rate", (float)0.01); - int maxFold = conf.getInt("io.hfile.bloom.max.fold", 7); + float err = conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float)0.01); + int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD, 7); this.bloomFilter = new ByteBloomFilter(maxKeys, err, Hash.getHashType(conf), maxFold); @@ -790,16 +690,13 @@ public class StoreFile { * @param majorCompaction True if this file is product of a major compaction * @throws IOException problem writing to FS */ - public void appendMetadata(final long maxSequenceId, - final boolean majorCompaction) + public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) throws IOException { - appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); - appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); + writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); + writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); } - @Override - public void append(final KeyValue kv) - throws IOException { + public void append(final KeyValue kv) throws IOException { if (this.bloomFilter != null) { // only add to the bloom filter on a new, unique key boolean newKey = true; @@ -846,28 +743,28 @@ public class StoreFile { this.lastKv = kv; } } - super.append(kv); + writer.append(kv); } - @Override - public void append(final byte [] key, final byte [] value) - throws IOException { + public Path getPath() { + return this.writer.getPath(); + } + + public void append(final byte [] key, final byte [] value) throws IOException { if (this.bloomFilter != null) { // only add to the bloom filter on a new row - if(this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) { + if (this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) { this.bloomFilter.add(key); this.lastByteArray = key; } } - super.append(key, value); + writer.append(key, value); } - @Override - public void close() - throws IOException { + public void close() throws IOException { // make sure we wrote something to the bloom before adding it if (this.bloomFilter != null && this.bloomFilter.getKeyCount() > 0) { - bloomFilter.finalize(); + bloomFilter.compactBloom(); if (this.bloomFilter.getMaxKeys() > 0) { int b = this.bloomFilter.getByteSize(); int k = this.bloomFilter.getKeyCount(); @@ -876,13 +773,184 @@ public class StoreFile { k + "/" + m + " (" + NumberFormat.getPercentInstance().format( ((double)k) / ((double)m)) + ")"); } - appendMetaBlock(BLOOM_FILTER_META_KEY, bloomFilter.getMetaWriter()); - appendMetaBlock(BLOOM_FILTER_DATA_KEY, bloomFilter.getDataWriter()); - appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); + writer.appendMetaBlock(BLOOM_FILTER_META_KEY, bloomFilter.getMetaWriter()); + writer.appendMetaBlock(BLOOM_FILTER_DATA_KEY, bloomFilter.getDataWriter()); + writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); } - super.close(); + writer.close(); } + public void appendFileInfo(byte[] key, byte[] value) throws IOException { + writer.appendFileInfo(key, value); + } + } + + /** + * Reader for a StoreFile. + */ + public static class Reader { + static final Log LOG = LogFactory.getLog(Reader.class.getName()); + + protected BloomFilter bloomFilter = null; + protected BloomType bloomFilterType; + private final HFile.Reader reader; + + public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory) + throws IOException { + reader = new HFile.Reader(fs, path, blockCache, inMemory); + bloomFilterType = BloomType.NONE; + } + + public RawComparator getComparator() { + return reader.getComparator(); + } + + /** + * Get a scanner to scan over this StoreFile. + * + * @param cacheBlocks should this scanner cache blocks? + * @param pread use pread (for highly concurrent small readers) + * @return a scanner + */ + public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) { + return new StoreFileScanner(this, getScanner(cacheBlocks, pread)); + } + + /** + * Warning: Do not write further code which depends on this call. Instead + * use getStoreFileScanner() which uses the StoreFileScanner class/interface + * which is the preferred way to scan a store with higher level concepts. + * + * @param cacheBlocks should we cache the blocks? + * @param pread use pread (for concurrent small readers) + * @return the underlying HFileScanner + */ + @Deprecated + public HFileScanner getScanner(boolean cacheBlocks, boolean pread) { + return reader.getScanner(cacheBlocks, pread); + } + + public void close() throws IOException { + reader.close(); + } + + public boolean shouldSeek(final byte[] row, + final SortedSet columns) { + + if (this.bloomFilter == null) { + return true; + } + + byte[] key; + switch (this.bloomFilterType) { + case ROW: + key = row; + break; + case ROWCOL: + if (columns.size() == 1) { + byte[] col = columns.first(); + key = Bytes.add(row, col); + break; + } + //$FALL-THROUGH$ + default: + return true; + } + + try { + ByteBuffer bloom = reader.getMetaBlock(BLOOM_FILTER_DATA_KEY, true); + if (bloom != null) { + return this.bloomFilter.contains(key, bloom); + } + } catch (IOException e) { + LOG.error("Error reading bloom filter data -- proceeding without", + e); + setBloomFilterFaulty(); + } catch (IllegalArgumentException e) { + LOG.error("Bad bloom filter data -- proceeding without", e); + setBloomFilterFaulty(); + } + + return true; + } + + public Map loadFileInfo() throws IOException { + Map fi = reader.loadFileInfo(); + + byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY); + if (b != null) { + bloomFilterType = BloomType.valueOf(Bytes.toString(b)); + } + + return fi; + } + + public void loadBloomfilter() { + if (this.bloomFilter != null) { + return; // already loaded + } + + try { + ByteBuffer b = reader.getMetaBlock(BLOOM_FILTER_META_KEY, false); + if (b != null) { + if (bloomFilterType == BloomType.NONE) { + throw new IOException("valid bloom filter type not found in FileInfo"); + } + + + this.bloomFilter = new ByteBloomFilter(b); + LOG.info("Loaded " + (bloomFilterType== BloomType.ROW? "row":"col") + + " bloom filter metadata for " + reader.getName()); + } + } catch (IOException e) { + LOG.error("Error reading bloom filter meta -- proceeding without", e); + this.bloomFilter = null; + } catch (IllegalArgumentException e) { + LOG.error("Bad bloom filter meta -- proceeding without", e); + this.bloomFilter = null; + } + } + + public int getFilterEntries() { + return (this.bloomFilter != null) ? this.bloomFilter.getKeyCount() + : reader.getFilterEntries(); + } + + public ByteBuffer getMetaBlock(String bloomFilterDataKey, boolean cacheBlock) throws IOException { + return reader.getMetaBlock(bloomFilterDataKey, cacheBlock); + } + + public void setBloomFilterFaulty() { + bloomFilter = null; + } + + public byte[] getLastKey() { + return reader.getLastKey(); + } + + public byte[] midkey() throws IOException { + return reader.midkey(); + } + + public long length() { + return reader.length(); + } + + public int getEntries() { + return reader.getEntries(); + } + + public byte[] getFirstKey() { + return reader.getFirstKey(); + } + + public long indexSize() { + return reader.indexSize(); + } + + public BloomType getBloomFilterType() { + return this.bloomFilterType; + } } /** diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 3541b3b5332..83a4acb90ab 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -24,28 +24,31 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.SortedSet; /** - * A KeyValue scanner that iterates over a single HFile + * KeyValueScanner adaptor over the Reader. It also provides hooks into + * bloom filter things. */ class StoreFileScanner implements KeyValueScanner { static final Log LOG = LogFactory.getLog(Store.class); - private HFileScanner hfs; + // the reader it comes from: + private final StoreFile.Reader reader; + private final HFileScanner hfs; private KeyValue cur = null; /** * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param hfs HFile scanner */ - private StoreFileScanner(HFileScanner hfs) { + public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs) { + this.reader = reader; this.hfs = hfs; } @@ -60,16 +63,12 @@ class StoreFileScanner implements KeyValueScanner { List scanners = new ArrayList(filesToCompact.size()); for (StoreFile file : filesToCompact) { - Reader r = file.createReader(); - scanners.add(new StoreFileScanner(r.getScanner(cacheBlocks, usePread))); + StoreFile.Reader r = file.createReader(); + scanners.add(r.getStoreFileScanner(cacheBlocks, usePread)); } return scanners; } - - public HFileScanner getHFileScanner() { - return this.hfs; - } - + public String toString() { return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]"; } @@ -131,4 +130,10 @@ class StoreFileScanner implements KeyValueScanner { // Seeked to the exact key return true; } + + // Bloom filter hook. + public boolean shouldSeek(final byte[] row, + final SortedSet columns) { + return reader.shouldSeek(row, columns); + } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7680a222cb5..42423e7eead 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -155,9 +155,9 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb new ArrayList(sfScanners.size()+1); // exclude scan files that have failed file filters - for(StoreFileScanner sfs : sfScanners) { + for (StoreFileScanner sfs : sfScanners) { if (isGet && - !sfs.getHFileScanner().shouldSeek(scan.getStartRow(), columns)) { + !sfs.shouldSeek(scan.getStartRow(), columns)) { continue; // exclude this hfs } scanners.add(sfs); diff --git a/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java b/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java index 42e816ac837..f1003662be0 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java +++ b/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java @@ -1,5 +1,5 @@ -/** - * Copyright 2009 The Apache Software Foundation +/* + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,38 +19,37 @@ */ package org.apache.hadoop.hbase.util; -import java.io.IOException; -import java.nio.ByteBuffer; - import org.apache.hadoop.io.Writable; +import java.nio.ByteBuffer; + /** * Defines the general behavior of a bloom filter. *

- * The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by + * The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by * the networking research community in the past decade thanks to the bandwidth efficiencies that it - * offers for the transmission of set membership information between networked hosts. A sender encodes - * the information into a bit vector, the Bloom filter, that is more compact than a conventional - * representation. Computation and space costs for construction are linear in the number of elements. - * The receiver uses the filter to test whether various elements are members of the set. Though the - * filter will occasionally return a false positive, it will never return a false negative. When creating - * the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size. - * + * offers for the transmission of set membership information between networked hosts. A sender encodes + * the information into a bit vector, the Bloom filter, that is more compact than a conventional + * representation. Computation and space costs for construction are linear in the number of elements. + * The receiver uses the filter to test whether various elements are members of the set. Though the + * filter will occasionally return a false positive, it will never return a false negative. When creating + * the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size. + * *

* Originally created by * European Commission One-Lab Project 034819. - * + * *

* It must be extended in order to define the real behavior. */ public interface BloomFilter { - /** - * Allocate memory for the bloom filter data. Note that bloom data isn't - * allocated by default because it can grow large & reads would be better + /** + * Allocate memory for the bloom filter data. Note that bloom data isn't + * allocated by default because it can grow large & reads would be better * managed by the LRU cache. */ void allocBloom(); - + /** * Add the specified binary to the bloom filter. * @@ -86,7 +85,7 @@ public interface BloomFilter { * @return true if matched by bloom, false if not */ boolean contains(byte [] buf, int offset, int length, ByteBuffer bloom); - + /** * @return The number of keys added to the bloom */ @@ -97,16 +96,16 @@ public interface BloomFilter { * to maintain the desired error rate */ public int getMaxKeys(); - + /** - * Size of the bloom, in bytes + * @return Size of the bloom, in bytes */ public int getByteSize(); - + /** - * Finalize the bloom before writing metadata & data to disk + * Compact the bloom before writing metadata & data to disk */ - void finalize(); + void compactBloom(); /** * Get a writable interface into bloom filter meta data. diff --git a/src/main/java/org/apache/hadoop/hbase/util/ByteBloomFilter.java b/src/main/java/org/apache/hadoop/hbase/util/ByteBloomFilter.java index c6bb358f970..35190f08926 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/ByteBloomFilter.java +++ b/src/main/java/org/apache/hadoop/hbase/util/ByteBloomFilter.java @@ -1,35 +1,6 @@ -/** +/* + * Copyright 2010 The Apache Software Foundation * - * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org) - * All rights reserved. - * Redistribution and use in source and binary forms, with or - * without modification, are permitted provided that the following - * conditions are met: - * - Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * - Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the distribution. - * - Neither the name of the University Catholique de Louvain - UCL - * nor the names of its contributors may be used to endorse or - * promote products derived from this software without specific prior - * written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS - * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE - * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, - * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN - * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -48,41 +19,38 @@ */ package org.apache.hadoop.hbase.util; - -import java.io.DataOutput; -import java.io.DataInput; -import java.io.IOException; -import java.lang.Math; -import java.nio.ByteBuffer; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.bloom.Filter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; /** * Implements a Bloom filter, as defined by Bloom in 1970. *

- * The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by + * The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by * the networking research community in the past decade thanks to the bandwidth efficiencies that it - * offers for the transmission of set membership information between networked hosts. A sender encodes - * the information into a bit vector, the Bloom filter, that is more compact than a conventional - * representation. Computation and space costs for construction are linear in the number of elements. - * The receiver uses the filter to test whether various elements are members of the set. Though the - * filter will occasionally return a false positive, it will never return a false negative. When creating - * the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size. - * + * offers for the transmission of set membership information between networked hosts. A sender encodes + * the information into a bit vector, the Bloom filter, that is more compact than a conventional + * representation. Computation and space costs for construction are linear in the number of elements. + * The receiver uses the filter to test whether various elements are members of the set. Though the + * filter will occasionally return a false positive, it will never return a false negative. When creating + * the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size. + * *

- * Originally created by + * Originally inspired by * European Commission One-Lab Project 034819. - * + * * @see BloomFilter The general behavior of a filter - * + * * @see Space/Time Trade-Offs in Hash Coding with Allowable Errors */ public class ByteBloomFilter implements BloomFilter { /** Current file format version */ public static final int VERSION = 1; - + /** Bytes (B) in the array */ protected int byteSize; /** Number of hash functions */ @@ -97,7 +65,7 @@ public class ByteBloomFilter implements BloomFilter { protected int maxKeys; /** Bloom bits */ protected ByteBuffer bloom; - + /** Bit-value lookup array to prevent doing the same work over and over */ private static final byte [] bitvals = { (byte) 0x01, @@ -115,11 +83,11 @@ public class ByteBloomFilter implements BloomFilter { * @param meta stored bloom meta data * @throws IllegalArgumentException meta data is invalid */ - public ByteBloomFilter(ByteBuffer meta) - throws IllegalArgumentException { + public ByteBloomFilter(ByteBuffer meta) + throws IllegalArgumentException { int version = meta.getInt(); if (version != VERSION) throw new IllegalArgumentException("Bad version"); - + this.byteSize = meta.getInt(); this.hashCount = meta.getInt(); this.hashType = meta.getInt(); @@ -136,30 +104,30 @@ public class ByteBloomFilter implements BloomFilter { * @param maxKeys Maximum expected number of keys that will be stored in this bloom * @param errorRate Desired false positive error rate. Lower rate = more storage required * @param hashType Type of hash function to use - * @param foldFactor When finished adding entries, you may be able to 'fold' - * this bloom to save space. Tradeoff potentially excess bytes in bloom for + * @param foldFactor When finished adding entries, you may be able to 'fold' + * this bloom to save space. Tradeoff potentially excess bytes in bloom for * ability to fold if keyCount is exponentially greater than maxKeys. * @throws IllegalArgumentException */ public ByteBloomFilter(int maxKeys, float errorRate, int hashType, int foldFactor) - throws IllegalArgumentException { - /* - * Bloom filters are very sensitive to the number of elements inserted - * into them. For HBase, the number of entries depends on the size of the - * data stored in the column. Currently the default region size is 256MB, - * so entry count ~= 256MB / (average value size for column). Despite - * this rule of thumb, there is no efficient way to calculate the entry - * count after compactions. Therefore, it is often easier to use a + throws IllegalArgumentException { + /* + * Bloom filters are very sensitive to the number of elements inserted + * into them. For HBase, the number of entries depends on the size of the + * data stored in the column. Currently the default region size is 256MB, + * so entry count ~= 256MB / (average value size for column). Despite + * this rule of thumb, there is no efficient way to calculate the entry + * count after compactions. Therefore, it is often easier to use a * dynamic bloom filter that will add extra space instead of allowing the * error rate to grow. - * + * * ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf ) * * m denotes the number of bits in the Bloom filter (bitSize) * n denotes the number of elements inserted into the Bloom filter (maxKeys) * k represents the number of hash functions used (nbHash) * e represents the desired false positive rate for the bloom (err) - * + * * If we fix the error rate (e) and know the number of entries, then * the optimal bloom size m = -(n * ln(err) / (ln(2)^2) * ~= n * ln(err) / ln(0.6185) @@ -196,12 +164,12 @@ public class ByteBloomFilter implements BloomFilter { this.bloom = ByteBuffer.allocate(this.byteSize); assert this.bloom.hasArray(); } - + void sanityCheck() throws IllegalArgumentException { if(this.byteSize <= 0) { throw new IllegalArgumentException("maxValue must be > 0"); } - + if(this.hashCount <= 0) { throw new IllegalArgumentException("Hash function count must be > 0"); } @@ -209,12 +177,12 @@ public class ByteBloomFilter implements BloomFilter { if (this.hash == null) { throw new IllegalArgumentException("hashType must be known"); } - + if (this.keyCount < 0) { throw new IllegalArgumentException("must have positive keyCount"); } } - + void bloomCheck(ByteBuffer bloom) throws IllegalArgumentException { if (this.byteSize != bloom.limit()) { throw new IllegalArgumentException( @@ -243,14 +211,14 @@ public class ByteBloomFilter implements BloomFilter { ++this.keyCount; } - + /** * Should only be used in tests when writing a bloom filter. */ boolean contains(byte [] buf) { return contains(buf, 0, buf.length, this.bloom); } - + /** * Should only be used in tests when writing a bloom filter. */ @@ -264,7 +232,7 @@ public class ByteBloomFilter implements BloomFilter { } @Override - public boolean contains(byte [] buf, int offset, int length, + public boolean contains(byte [] buf, int offset, int length, ByteBuffer theBloom) { if(theBloom.limit() != this.byteSize) { @@ -282,11 +250,11 @@ public class ByteBloomFilter implements BloomFilter { } return true; } - + //--------------------------------------------------------------------------- /** Private helpers */ - - /** + + /** * Set the bit at the specified index to 1. * * @param pos index of bit @@ -298,7 +266,7 @@ public class ByteBloomFilter implements BloomFilter { curByte |= bitvals[bitPos]; bloom.put(bytePos, curByte); } - + /** * Check if bit at specified index is 1. * @@ -312,37 +280,37 @@ public class ByteBloomFilter implements BloomFilter { curByte &= bitvals[bitPos]; return (curByte != 0); } - + @Override public int getKeyCount() { return this.keyCount; } - + @Override public int getMaxKeys() { return this.maxKeys; } - + @Override public int getByteSize() { return this.byteSize; } @Override - public void finalize() { + public void compactBloom() { // see if the actual size is exponentially smaller than expected. if (this.keyCount > 0 && this.bloom.hasArray()) { int pieces = 1; int newByteSize = this.byteSize; int newMaxKeys = this.maxKeys; - + // while exponentially smaller & folding is lossless while ( (newByteSize & 1) == 0 && newMaxKeys > (this.keyCount<<1) ) { pieces <<= 1; newByteSize >>= 1; newMaxKeys >>= 1; } - + // if we should fold these into pieces if (pieces > 1) { byte[] array = this.bloom.array(); @@ -351,7 +319,7 @@ public class ByteBloomFilter implements BloomFilter { int off = end; for(int p = 1; p < pieces; ++p) { for(int pos = start; pos < end; ++pos) { - array[pos] |= array[off++]; + array[pos] |= array[off++]; } } // folding done, only use a subset of this array @@ -388,9 +356,9 @@ public class ByteBloomFilter implements BloomFilter { public Writable getDataWriter() { return new DataWriter(); } - + private class MetaWriter implements Writable { - protected MetaWriter() {} + protected MetaWriter() {} @Override public void readFields(DataInput arg0) throws IOException { throw new IOException("Cant read with this class."); diff --git a/src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java b/src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java index f818279202f..bb33a629b85 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java +++ b/src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java @@ -1,35 +1,6 @@ -/** +/* + * Copyright 2010 The Apache Software Foundation * - * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org) - * All rights reserved. - * Redistribution and use in source and binary forms, with or - * without modification, are permitted provided that the following - * conditions are met: - * - Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * - Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the distribution. - * - Neither the name of the University Catholique de Louvain - UCL - * nor the names of its contributors may be used to endorse or - * promote products derived from this software without specific prior - * written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS - * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE - * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, - * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN - * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -49,30 +20,30 @@ package org.apache.hadoop.hbase.util; +import org.apache.hadoop.io.Writable; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.hadoop.io.Writable; - /** * Implements a dynamic Bloom filter, as defined in the INFOCOM 2006 paper. *

* A dynamic Bloom filter (DBF) makes use of a s * m bit matrix but - * each of the s rows is a standard Bloom filter. The creation + * each of the s rows is a standard Bloom filter. The creation * process of a DBF is iterative. At the start, the DBF is a 1 * m * bit matrix, i.e., it is composed of a single standard Bloom filter. - * It assumes that nr elements are recorded in the + * It assumes that nr elements are recorded in the * initial bit vector, where nr <= n (n is - * the cardinality of the set A to record in the filter). + * the cardinality of the set A to record in the filter). *

* As the size of A grows during the execution of the application, * several keys must be inserted in the DBF. When inserting a key into the DBF, * one must first get an active Bloom filter in the matrix. A Bloom filter is - * active when the number of recorded keys, nr, is + * active when the number of recorded keys, nr, is * strictly less than the current cardinality of A, n. - * If an active Bloom filter is found, the key is inserted and + * If an active Bloom filter is found, the key is inserted and * nr is incremented by one. On the other hand, if there * is no active Bloom filter, a new one is created (i.e., a new row is added to * the matrix) according to the current size of A and the element @@ -84,7 +55,7 @@ import org.apache.hadoop.io.Writable; * European Commission One-Lab Project 034819. * * @see BloomFilter A Bloom filter - * + * * @see Theory and Network Applications of Dynamic Bloom Filters */ public class DynamicByteBloomFilter implements BloomFilter { @@ -108,8 +79,7 @@ public class DynamicByteBloomFilter implements BloomFilter { * @param meta stored bloom meta data * @throws IllegalArgumentException meta data is invalid */ - public DynamicByteBloomFilter(ByteBuffer meta) - throws IllegalArgumentException { + public DynamicByteBloomFilter(ByteBuffer meta) throws IllegalArgumentException { int version = meta.getInt(); if (version != VERSION) throw new IllegalArgumentException("Bad version"); @@ -118,7 +88,7 @@ public class DynamicByteBloomFilter implements BloomFilter { this.hashType = meta.getInt(); this.readMatrixSize = meta.getInt(); this.curKeys = meta.getInt(); - + readSanityCheck(); this.matrix = new ByteBloomFilter[1]; @@ -126,12 +96,9 @@ public class DynamicByteBloomFilter implements BloomFilter { } /** - * Normal write constructor. Note that this doesn't allocate bloom data by + * Normal write constructor. Note that this doesn't allocate bloom data by * default. Instead, call allocBloom() before adding entries. - * @param bitSize The vector size of this filter. - * @param functionCount The number of hash function to consider. - * @param hashType type of the hashing function (see - * {@link org.apache.hadoop.util.hash.Hash}). + * @param hashType type of the hashing function (see {@link org.apache.hadoop.util.hash.Hash}). * @param keyInterval Maximum number of keys to record per Bloom filter row. * @throws IllegalArgumentException The input parameters were invalid */ @@ -141,7 +108,7 @@ public class DynamicByteBloomFilter implements BloomFilter { this.errorRate = errorRate; this.hashType = hashType; this.curKeys = 0; - + if(keyInterval <= 0) { throw new IllegalArgumentException("keyCount must be > 0"); } @@ -164,7 +131,7 @@ public class DynamicByteBloomFilter implements BloomFilter { throw new IllegalArgumentException("matrix size must be known"); } } - + @Override public void add(byte []buf, int offset, int len) { BloomFilter bf = getCurBloom(); @@ -207,17 +174,17 @@ public class DynamicByteBloomFilter implements BloomFilter { public boolean contains(byte [] buf, ByteBuffer theBloom) { return contains(buf, 0, buf.length, theBloom); } - + @Override - public boolean contains(byte[] buf, int offset, int length, + public boolean contains(byte[] buf, int offset, int length, ByteBuffer theBloom) { if(offset + length > buf.length) { return false; } - + // current version assumes uniform size - int bytesPerBloom = this.matrix[0].getByteSize(); - + int bytesPerBloom = this.matrix[0].getByteSize(); + if(theBloom.limit() != bytesPerBloom * readMatrixSize) { throw new IllegalArgumentException("Bloom does not match expected size"); } @@ -233,7 +200,7 @@ public class DynamicByteBloomFilter implements BloomFilter { return true; } } - + // matched no bloom filters return false; } @@ -251,14 +218,14 @@ public class DynamicByteBloomFilter implements BloomFilter { public int getMaxKeys() { return bloomCount() * this.keyInterval; } - + @Override public int getByteSize() { return bloomCount() * this.matrix[0].getByteSize(); } @Override - public void finalize() { + public void compactBloom() { } /** @@ -298,9 +265,9 @@ public class DynamicByteBloomFilter implements BloomFilter { public Writable getDataWriter() { return new DataWriter(); } - + private class MetaWriter implements Writable { - protected MetaWriter() {} + protected MetaWriter() {} @Override public void readFields(DataInput arg0) throws IOException { throw new IOException("Cant read with this class."); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index 52ebac3b1ec..9c1c95c6889 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -57,9 +57,9 @@ import org.junit.Test; */ public class TestFSErrorsExposed { private static final Log LOG = LogFactory.getLog(TestFSErrorsExposed.class); - + HBaseTestingUtility util = new HBaseTestingUtility(); - + /** * Injects errors into the pread calls of an on-disk file, and makes * sure those bubble up to the HFile scanner @@ -73,21 +73,21 @@ public class TestFSErrorsExposed { StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2*1024); TestStoreFile.writeStoreFile( writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); - + StoreFile sf = new StoreFile(fs, writer.getPath(), false, util.getConfiguration(), StoreFile.BloomType.NONE, false); - HFile.Reader reader = sf.createReader(); + StoreFile.Reader reader = sf.createReader(); HFileScanner scanner = reader.getScanner(false, true); - + FaultyInputStream inStream = fs.inStreams.get(0).get(); assertNotNull(inStream); - + scanner.seekTo(); // Do at least one successful read assertTrue(scanner.next()); - + inStream.startFaults(); - + try { int scanned=0; while (scanner.next()) { @@ -100,7 +100,7 @@ public class TestFSErrorsExposed { } reader.close(); } - + /** * Injects errors into the pread calls of an on-disk file, and makes * sure those bubble up to the StoreFileScanner @@ -111,25 +111,25 @@ public class TestFSErrorsExposed { HBaseTestingUtility.getTestDir("internalScannerExposesErrors"), "regionname"), "familyname"); FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem()); - HFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2 * 1024); + StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2 * 1024); TestStoreFile.writeStoreFile( writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); - + StoreFile sf = new StoreFile(fs, writer.getPath(), false, util.getConfiguration(), BloomType.NONE, false); List scanners = StoreFileScanner.getScannersForStoreFiles( Collections.singletonList(sf), false, true); KeyValueScanner scanner = scanners.get(0); - + FaultyInputStream inStream = fs.inStreams.get(0).get(); assertNotNull(inStream); - + scanner.seek(KeyValue.LOWESTKEY); // Do at least one successful read assertNotNull(scanner.next()); - + inStream.startFaults(); - + try { int scanned=0; while (scanner.next() != null) { @@ -142,7 +142,7 @@ public class TestFSErrorsExposed { } scanner.close(); } - + /** * Cluster test which starts a region server with a region, then * removes the data from HDFS underneath it, and ensures that @@ -154,25 +154,25 @@ public class TestFSErrorsExposed { util.startMiniCluster(1); byte[] tableName = Bytes.toBytes("table"); byte[] fam = Bytes.toBytes("fam"); - + HBaseAdmin admin = new HBaseAdmin(util.getConfiguration()); HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(new HColumnDescriptor( fam, 1, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, HConstants.FOREVER, "NONE")); admin.createTable(desc); - + HTable table = new HTable(tableName); - + // Load some data util.loadTable(table, fam); table.flushCommits(); util.flush(); util.countRows(table); - + // Kill the DFS cluster util.getDFSCluster().shutdownDataNodes(); - + try { util.countRows(table); fail("Did not fail to count after removing data"); @@ -180,16 +180,16 @@ public class TestFSErrorsExposed { LOG.info("Got expected error", e); assertTrue(e.getMessage().contains("Could not seek")); } - + } finally { util.shutdownMiniCluster(); } } - + static class FaultyFileSystem extends FilterFileSystem { List> inStreams = new ArrayList>(); - + public FaultyFileSystem(FileSystem testFileSystem) { super(testFileSystem); } @@ -202,16 +202,16 @@ public class TestFSErrorsExposed { return faulty; } } - + static class FaultyInputStream extends FSDataInputStream { boolean faultsStarted = false; - + public FaultyInputStream(InputStream in) throws IOException { super(in); } public void startFaults() { - faultsStarted = true; + faultsStarted = true; } public int read(long position, byte[] buffer, int offset, int length) @@ -219,7 +219,7 @@ public class TestFSErrorsExposed { injectFault(); return ((PositionedReadable)in).read(position, buffer, offset, length); } - + private void injectFault() throws IOException { if (faultsStarted) { throw new IOException("Fault injected"); @@ -227,5 +227,5 @@ public class TestFSErrorsExposed { } } - + } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index e65ae34b70f..57269ef5f4c 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -82,14 +82,14 @@ public class TestStoreFile extends HBaseTestCase { */ public void testBasicHalfMapFile() throws Exception { // Make up a directory hierarchy that has a regiondir and familyname. - HFile.Writer writer = StoreFile.createWriter(this.fs, + StoreFile.Writer writer = StoreFile.createWriter(this.fs, new Path(new Path(this.testDir, "regionname"), "familyname"), 2 * 1024); writeStoreFile(writer); - checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf, + checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf, StoreFile.BloomType.NONE, false)); } - private void writeStoreFile(final HFile.Writer writer) throws IOException { + private void writeStoreFile(final StoreFile.Writer writer) throws IOException { writeStoreFile(writer, Bytes.toBytes(getName()), Bytes.toBytes(getName())); } /* @@ -98,7 +98,7 @@ public class TestStoreFile extends HBaseTestCase { * @param writer * @throws IOException */ - public static void writeStoreFile(final HFile.Writer writer, byte[] fam, byte[] qualifier) + public static void writeStoreFile(final StoreFile.Writer writer, byte[] fam, byte[] qualifier) throws IOException { long now = System.currentTimeMillis(); try { @@ -123,11 +123,11 @@ public class TestStoreFile extends HBaseTestCase { Path storedir = new Path(new Path(this.testDir, "regionname"), "familyname"); Path dir = new Path(storedir, "1234567890"); // Make a store file and write data to it. - HFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024); + StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024); writeStoreFile(writer); - StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf, + StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf, StoreFile.BloomType.NONE, false); - HFile.Reader reader = hsf.createReader(); + StoreFile.Reader reader = hsf.createReader(); // Split on a row, not in middle of row. Midkey returned by reader // may be in middle of row. Create new one with empty column and // timestamp. @@ -137,7 +137,7 @@ public class TestStoreFile extends HBaseTestCase { byte [] finalRow = kv.getRow(); // Make a reference Path refPath = StoreFile.split(fs, dir, hsf, midRow, Range.top); - StoreFile refHsf = new StoreFile(this.fs, refPath, true, conf, + StoreFile refHsf = new StoreFile(this.fs, refPath, true, conf, StoreFile.BloomType.NONE, false); // Now confirm that I can read from the reference and that it only gets // keys from top half of the file. @@ -174,9 +174,9 @@ public class TestStoreFile extends HBaseTestCase { Path bottomPath = StoreFile.split(this.fs, bottomDir, f, midRow, Range.bottom); // Make readers on top and bottom. - HFile.Reader top = new StoreFile(this.fs, topPath, true, conf, + StoreFile.Reader top = new StoreFile(this.fs, topPath, true, conf, StoreFile.BloomType.NONE, false).createReader(); - HFile.Reader bottom = new StoreFile(this.fs, bottomPath, true, conf, + StoreFile.Reader bottom = new StoreFile(this.fs, bottomPath, true, conf, StoreFile.BloomType.NONE, false).createReader(); ByteBuffer previous = null; LOG.info("Midkey: " + midKV.toString()); @@ -229,9 +229,9 @@ public class TestStoreFile extends HBaseTestCase { topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top); bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey, Range.bottom); - top = new StoreFile(this.fs, topPath, true, conf, + top = new StoreFile(this.fs, topPath, true, conf, StoreFile.BloomType.NONE, false).createReader(); - bottom = new StoreFile(this.fs, bottomPath, true, conf, + bottom = new StoreFile(this.fs, bottomPath, true, conf, StoreFile.BloomType.NONE, false).createReader(); bottomScanner = bottom.getScanner(false, false); int count = 0; @@ -250,7 +250,6 @@ public class TestStoreFile extends HBaseTestCase { assertTrue(topScanner.getReader().getComparator().compare(key.array(), key.arrayOffset(), key.limit(), badmidkey, 0, badmidkey.length) >= 0); if (first) { - first = false; first = false; KeyValue keyKV = KeyValue.createKeyValueFromKey(key); LOG.info("First top when key < bottom: " + keyKV); @@ -275,9 +274,9 @@ public class TestStoreFile extends HBaseTestCase { topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top); bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey, Range.bottom); - top = new StoreFile(this.fs, topPath, true, conf, + top = new StoreFile(this.fs, topPath, true, conf, StoreFile.BloomType.NONE, false).createReader(); - bottom = new StoreFile(this.fs, bottomPath, true, conf, + bottom = new StoreFile(this.fs, bottomPath, true, conf, StoreFile.BloomType.NONE, false).createReader(); first = true; bottomScanner = bottom.getScanner(false, false); @@ -317,44 +316,44 @@ public class TestStoreFile extends HBaseTestCase { fs.delete(f.getPath(), true); } } - + private static String ROOT_DIR = HBaseTestingUtility.getTestDir("TestStoreFile").toString(); private static String localFormatter = "%010d"; - + public void testBloomFilter() throws Exception { FileSystem fs = FileSystem.getLocal(conf); conf.setFloat("io.hfile.bloom.error.rate", (float)0.01); conf.setBoolean("io.hfile.bloom.enabled", true); - + // write the file Path f = new Path(ROOT_DIR, getName()); - StoreFile.Writer writer = new StoreFile.Writer(fs, f, - StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, + StoreFile.Writer writer = new StoreFile.Writer(fs, f, + StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000); long now = System.currentTimeMillis(); for (int i = 0; i < 2000; i += 2) { - String row = String.format(localFormatter, Integer.valueOf(i)); + String row = String.format(localFormatter, i); KeyValue kv = new KeyValue(row.getBytes(), "family".getBytes(), "col".getBytes(), now, "value".getBytes()); writer.append(kv); } writer.close(); - + StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false); reader.loadFileInfo(); reader.loadBloomfilter(); - HFileScanner scanner = reader.getScanner(false, false); + StoreFileScanner scanner = reader.getStoreFileScanner(false, false); // check false positives rate int falsePos = 0; int falseNeg = 0; for (int i = 0; i < 2000; i++) { - String row = String.format(localFormatter, Integer.valueOf(i)); + String row = String.format(localFormatter, i); TreeSet columns = new TreeSet(); columns.add("family:col".getBytes()); - + boolean exists = scanner.shouldSeek(row.getBytes(), columns); if (i % 2 == 0) { if (!exists) falseNeg++; @@ -369,19 +368,19 @@ public class TestStoreFile extends HBaseTestCase { System.out.println("False positives: " + falsePos); assertTrue(falsePos < 2); } - + public void testBloomTypes() throws Exception { float err = (float) 0.01; FileSystem fs = FileSystem.getLocal(conf); conf.setFloat("io.hfile.bloom.error.rate", err); conf.setBoolean("io.hfile.bloom.enabled", true); - + int rowCount = 50; int colCount = 10; int versions = 2; - + // run once using columns and once using rows - StoreFile.BloomType[] bt = + StoreFile.BloomType[] bt = {StoreFile.BloomType.ROWCOL, StoreFile.BloomType.ROW}; int[] expKeys = {rowCount*colCount, rowCount}; // below line deserves commentary. it is expected bloom false positives @@ -393,19 +392,19 @@ public class TestStoreFile extends HBaseTestCase { for (int x : new int[]{0,1}) { // write the file Path f = new Path(ROOT_DIR, getName()); - StoreFile.Writer writer = new StoreFile.Writer(fs, f, - StoreFile.DEFAULT_BLOCKSIZE_SMALL, + StoreFile.Writer writer = new StoreFile.Writer(fs, f, + StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, conf, KeyValue.COMPARATOR, bt[x], expKeys[x]); - + long now = System.currentTimeMillis(); for (int i = 0; i < rowCount*2; i += 2) { // rows for (int j = 0; j < colCount*2; j += 2) { // column qualifiers - String row = String.format(localFormatter, Integer.valueOf(i)); - String col = String.format(localFormatter, Integer.valueOf(j)); - for (int k= 0; k < versions; ++k) { // versions - KeyValue kv = new KeyValue(row.getBytes(), - "family".getBytes(), ("col" + col).getBytes(), + String row = String.format(localFormatter, i); + String col = String.format(localFormatter, j); + for (int k= 0; k < versions; ++k) { // versions + KeyValue kv = new KeyValue(row.getBytes(), + "family".getBytes(), ("col" + col).getBytes(), now-k, Bytes.toBytes((long)-1)); writer.append(kv); } @@ -416,16 +415,16 @@ public class TestStoreFile extends HBaseTestCase { StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false); reader.loadFileInfo(); reader.loadBloomfilter(); - HFileScanner scanner = reader.getScanner(false, false); - assertEquals(expKeys[x], reader.getBloomFilter().getKeyCount()); - + StoreFileScanner scanner = reader.getStoreFileScanner(false, false); + assertEquals(expKeys[x], reader.bloomFilter.getKeyCount()); + // check false positives rate int falsePos = 0; int falseNeg = 0; for (int i = 0; i < rowCount*2; ++i) { // rows for (int j = 0; j < colCount*2; ++j) { // column qualifiers - String row = String.format(localFormatter, Integer.valueOf(i)); - String col = String.format(localFormatter, Integer.valueOf(j)); + String row = String.format(localFormatter, i); + String col = String.format(localFormatter, j); TreeSet columns = new TreeSet(); columns.add(("col" + col).getBytes()); @@ -448,7 +447,6 @@ public class TestStoreFile extends HBaseTestCase { assertEquals(0, falseNeg); assertTrue(falsePos < 2*expErr[x]); } - } public void testFlushTimeComparator() {