HBASE-2501 Refactor StoreFile Code
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@958468 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5d71084915
commit
35616d9205
|
@ -425,6 +425,7 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-2787 PE is confused about flushCommits
|
||||
HBASE-2707 Can't recover from a dead ROOT server if any exceptions happens
|
||||
during log splitting
|
||||
HBASE-2501 Refactor StoreFile Code
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1760 Cleanup TODOs in HTable
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.io;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.SortedSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -29,9 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
|
@ -155,11 +152,6 @@ public class HalfStoreFileReader extends StoreFile.Reader {
|
|||
return this.delegate.seekBefore(key, offset, length);
|
||||
}
|
||||
|
||||
public boolean shouldSeek(byte[] row,
|
||||
final SortedSet<byte[]> columns) {
|
||||
return this.delegate.shouldSeek(row, columns);
|
||||
}
|
||||
|
||||
public boolean seekTo() throws IOException {
|
||||
if (top) {
|
||||
int r = this.delegate.seekTo(splitkey);
|
||||
|
@ -209,7 +201,7 @@ public class HalfStoreFileReader extends StoreFile.Reader {
|
|||
return delegate.seekTo(key, offset, length);
|
||||
}
|
||||
|
||||
public Reader getReader() {
|
||||
public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
|
||||
return this.delegate.getReader();
|
||||
}
|
||||
|
||||
|
|
|
@ -455,7 +455,7 @@ public class HFile {
|
|||
appendFileInfo(this.fileinfo, k, v, true);
|
||||
}
|
||||
|
||||
FileInfo appendFileInfo(FileInfo fi, final byte [] k, final byte [] v,
|
||||
static FileInfo appendFileInfo(FileInfo fi, final byte [] k, final byte [] v,
|
||||
final boolean checkPrefix)
|
||||
throws IOException {
|
||||
if (k == null || v == null) {
|
||||
|
@ -1078,7 +1078,7 @@ public class HFile {
|
|||
}
|
||||
return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0];
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the first row key, or null if the file is empty.
|
||||
* TODO move this to StoreFile after Ryan's patch goes in
|
||||
|
@ -1122,7 +1122,7 @@ public class HFile {
|
|||
if (lastKey == null) return null;
|
||||
return KeyValue.createKeyValueFromKey(lastKey).getRow();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return number of K entries in this HFile's filter. Returns KV count if no filter.
|
||||
*/
|
||||
|
@ -1164,6 +1164,10 @@ public class HFile {
|
|||
}
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/*
|
||||
* Implementation of {@link HFileScanner} interface.
|
||||
*/
|
||||
|
@ -1248,11 +1252,6 @@ public class HFile {
|
|||
return true;
|
||||
}
|
||||
|
||||
public boolean shouldSeek(final byte[] row,
|
||||
final SortedSet<byte[]> columns) {
|
||||
return true;
|
||||
}
|
||||
|
||||
public int seekTo(byte [] key) throws IOException {
|
||||
return seekTo(key, 0, key.length);
|
||||
}
|
||||
|
@ -1706,7 +1705,7 @@ public class HFile {
|
|||
super();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return true if the given file info key is reserved for internal
|
||||
* use by HFile.
|
||||
|
|
|
@ -65,17 +65,6 @@ public interface HFileScanner {
|
|||
*/
|
||||
public boolean seekBefore(byte [] key) throws IOException;
|
||||
public boolean seekBefore(byte []key, int offset, int length) throws IOException;
|
||||
/**
|
||||
* Optimization for single key lookups. If the file has a filter,
|
||||
* perform a lookup on the key.
|
||||
* @param row the row to scan
|
||||
* @param family the column family to scan
|
||||
* @param columns the array of column qualifiers to scan
|
||||
* @return False if the key definitely does not exist in this ScanFile
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean shouldSeek(final byte[] row,
|
||||
final SortedSet<byte[]> columns);
|
||||
/**
|
||||
* Positions this scanner at the start of the file.
|
||||
* @return False if empty file; i.e. a call to next would return false and
|
||||
|
|
|
@ -61,11 +61,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
|
||||
|
||||
public static String NAME = "completebulkload";
|
||||
|
||||
|
||||
public LoadIncrementalHFiles(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
|
||||
public LoadIncrementalHFiles() {
|
||||
super();
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
|
||||
private void usage() {
|
||||
System.err.println("usage: " + NAME +
|
||||
" /path/to/hfileoutputformat-output " +
|
||||
" /path/to/hfileoutputformat-output " +
|
||||
"tablename");
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
private static class LoadQueueItem {
|
||||
final byte[] family;
|
||||
final Path hfilePath;
|
||||
|
||||
|
||||
public LoadQueueItem(byte[] family, Path hfilePath) {
|
||||
this.family = family;
|
||||
this.hfilePath = hfilePath;
|
||||
|
@ -102,17 +102,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
private Deque<LoadQueueItem> discoverLoadQueue(Path hfofDir)
|
||||
throws IOException {
|
||||
FileSystem fs = hfofDir.getFileSystem(getConf());
|
||||
|
||||
|
||||
if (!fs.exists(hfofDir)) {
|
||||
throw new FileNotFoundException("HFileOutputFormat dir " +
|
||||
hfofDir + " not found");
|
||||
hfofDir + " not found");
|
||||
}
|
||||
|
||||
|
||||
FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
|
||||
if (familyDirStatuses == null) {
|
||||
throw new FileNotFoundException("No families found in " + hfofDir);
|
||||
}
|
||||
|
||||
|
||||
Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();
|
||||
for (FileStatus stat : familyDirStatuses) {
|
||||
if (!stat.isDir()) {
|
||||
|
@ -144,13 +144,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
throws TableNotFoundException, IOException
|
||||
{
|
||||
HConnection conn = table.getConnection();
|
||||
|
||||
|
||||
if (!conn.isTableAvailable(table.getTableName())) {
|
||||
throw new TableNotFoundException("Table " +
|
||||
throw new TableNotFoundException("Table " +
|
||||
Bytes.toStringBinary(table.getTableName()) +
|
||||
"is not currently available.");
|
||||
}
|
||||
|
||||
|
||||
Deque<LoadQueueItem> queue = null;
|
||||
try {
|
||||
queue = discoverLoadQueue(hfofDir);
|
||||
|
@ -193,7 +193,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
} finally {
|
||||
hfr.close();
|
||||
}
|
||||
|
||||
|
||||
LOG.info("Trying to load hfile=" + hfilePath +
|
||||
" first=" + Bytes.toStringBinary(first) +
|
||||
" last=" + Bytes.toStringBinary(last));
|
||||
|
@ -202,7 +202,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
LOG.info("hfile " + hfilePath + " has no entries, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// We use a '_' prefix which is ignored when walking directory trees
|
||||
// above.
|
||||
final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
|
||||
|
@ -217,8 +217,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
if (!hri.containsRange(first, last)) {
|
||||
LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
|
||||
"region. Splitting...");
|
||||
|
||||
HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
|
||||
|
||||
HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
|
||||
Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
|
||||
Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
|
||||
splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
|
||||
|
@ -231,14 +231,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
byte[] regionName = location.getRegionInfo().getRegionName();
|
||||
server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Split a storefile into a top and bottom half, maintaining
|
||||
* the metadata, recreating bloom filters, etc.
|
||||
|
@ -251,11 +251,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
// Open reader with no block cache, and not in-memory
|
||||
Reference topReference = new Reference(splitKey, Range.top);
|
||||
Reference bottomReference = new Reference(splitKey, Range.bottom);
|
||||
|
||||
|
||||
copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
|
||||
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Copy half of an HFile into a new HFile.
|
||||
*/
|
||||
|
@ -265,15 +265,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
throws IOException {
|
||||
FileSystem fs = inFile.getFileSystem(conf);
|
||||
HalfStoreFileReader halfReader = null;
|
||||
HFile.Writer halfWriter = null;
|
||||
StoreFile.Writer halfWriter = null;
|
||||
try {
|
||||
halfReader = new HalfStoreFileReader(fs, inFile, null, reference);
|
||||
Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
|
||||
|
||||
|
||||
int blocksize = familyDescriptor.getBlocksize();
|
||||
Algorithm compression = familyDescriptor.getCompression();
|
||||
BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
|
||||
|
||||
|
||||
halfWriter = new StoreFile.Writer(
|
||||
fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR,
|
||||
bloomFilterType, 0);
|
||||
|
@ -283,7 +283,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
KeyValue kv = scanner.getKeyValue();
|
||||
halfWriter.append(kv);
|
||||
} while (scanner.next());
|
||||
|
||||
|
||||
for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
|
||||
if (shouldCopyHFileMetaKey(entry.getKey())) {
|
||||
halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
|
||||
|
@ -292,9 +292,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
} finally {
|
||||
if (halfWriter != null) halfWriter.close();
|
||||
if (halfReader != null) halfReader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static boolean shouldCopyHFileMetaKey(byte[] key) {
|
||||
return !HFile.isReservedFileInfoKey(key);
|
||||
}
|
||||
|
@ -306,10 +306,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
usage();
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
Path hfofDir = new Path(args[0]);
|
||||
HTable table = new HTable(args[1]);
|
||||
|
||||
|
||||
doBulkLoad(hfofDir, table);
|
||||
return 0;
|
||||
}
|
||||
|
@ -317,5 +317,5 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
public static void main(String[] args) throws Exception {
|
||||
ToolRunner.run(new LoadIncrementalHFiles(), args);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca
|
|||
* @return True if more.
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean next(HFile.Writer writer) throws IOException {
|
||||
public boolean next(StoreFile.Writer writer) throws IOException {
|
||||
KeyValue row = heap.peek();
|
||||
if (row == null) {
|
||||
close();
|
||||
|
|
|
@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
@ -59,14 +58,24 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.Iterables;
|
||||
|
||||
/**
|
||||
* A Store holds a column family in a Region. Its a memstore and a set of zero
|
||||
* or more StoreFiles, which stretch backwards over time.
|
||||
*
|
||||
* <p>There's no reason to consider append-logging at this level; all logging
|
||||
* and locking is handled at the HRegion level. Store just provides
|
||||
* services to manage sets of StoreFiles. One of the most important of those
|
||||
* services is compaction services where files are aggregated once they pass
|
||||
* a configurable threshold.
|
||||
* A Store holds a column family in a Region. Its a memstore and a set of zero
|
||||
* or more StoreFiles, which stretch backwards over time.
|
||||
*
|
||||
* <p>There's no reason to consider append-logging at this level; all logging
|
||||
* and locking is handled at the HRegion level. Store just provides
|
||||
* services to manage sets of StoreFiles. One of the most important of those
|
||||
* services is compaction services where files are aggregated once they pass
|
||||
* a configurable threshold.
|
||||
*
|
||||
* <p>The only thing having to do with logs that Store needs to deal with is
|
||||
* the reconstructionLog. This is a segment of an HRegion's log that might
|
||||
* NOT be present upon startup. If the param is NULL, there's nothing to do.
|
||||
* If the param is non-NULL, we need to process the log to reconstruct
|
||||
* a TreeMap that might not have been written to disk before the process
|
||||
* died.
|
||||
*
|
||||
* <p>It's assumed that after this constructor returns, the reconstructionLog
|
||||
* file will be deleted (by whoever has instantiated the Store).
|
||||
*
|
||||
* <p>Locking and transactions are handled at a higher level. This API should
|
||||
* not be called directly but by an HRegion manager.
|
||||
|
@ -303,7 +312,10 @@ public class Store implements HeapSize {
|
|||
reader.loadFileInfo();
|
||||
|
||||
byte[] firstKey = reader.getFirstRowKey();
|
||||
byte[] lastKey = reader.getLastRowKey();
|
||||
byte[] lk = reader.getLastKey();
|
||||
byte[] lastKey =
|
||||
(lk == null) ? null :
|
||||
KeyValue.createKeyValueFromKey(lk).getRow();
|
||||
|
||||
LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
|
||||
" last=" + Bytes.toStringBinary(lastKey));
|
||||
|
@ -423,8 +435,8 @@ public class Store implements HeapSize {
|
|||
* @throws IOException
|
||||
*/
|
||||
private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
|
||||
final long logCacheFlushId)
|
||||
throws IOException {
|
||||
final long logCacheFlushId)
|
||||
throws IOException {
|
||||
StoreFile.Writer writer = null;
|
||||
long flushed = 0;
|
||||
// Don't flush if there are no entries.
|
||||
|
@ -462,7 +474,7 @@ public class Store implements HeapSize {
|
|||
|
||||
StoreFile sf = new StoreFile(this.fs, dstPath, blockcache,
|
||||
this.conf, this.family.getBloomFilterType(), this.inMemory);
|
||||
Reader r = sf.createReader();
|
||||
StoreFile.Reader r = sf.createReader();
|
||||
this.storeSize += r.length();
|
||||
if(LOG.isInfoEnabled()) {
|
||||
LOG.info("Added " + sf + ", entries=" + r.getEntries() +
|
||||
|
@ -601,7 +613,7 @@ public class Store implements HeapSize {
|
|||
LOG.warn("Path is null for " + file);
|
||||
return null;
|
||||
}
|
||||
Reader r = file.getReader();
|
||||
StoreFile.Reader r = file.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("StoreFile " + file + " has a null Reader");
|
||||
return null;
|
||||
|
@ -653,7 +665,7 @@ public class Store implements HeapSize {
|
|||
this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
|
||||
(references? ", hasReferences=true,": " ") + " into " +
|
||||
region.getTmpDir() + ", seqid=" + maxId);
|
||||
HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
|
||||
StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
|
||||
// Move the compaction into place.
|
||||
StoreFile sf = completeCompaction(filesToCompact, writer);
|
||||
if (LOG.isInfoEnabled()) {
|
||||
|
@ -689,8 +701,7 @@ public class Store implements HeapSize {
|
|||
* @param dir
|
||||
* @throws IOException
|
||||
*/
|
||||
private static long getLowestTimestamp(FileSystem fs, Path dir)
|
||||
throws IOException {
|
||||
private static long getLowestTimestamp(FileSystem fs, Path dir) throws IOException {
|
||||
FileStatus[] stats = fs.listStatus(dir);
|
||||
if (stats == null || stats.length == 0) {
|
||||
return 0l;
|
||||
|
@ -716,8 +727,7 @@ public class Store implements HeapSize {
|
|||
* @param filesToCompact Files to compact. Can be null.
|
||||
* @return True if we should run a major compaction.
|
||||
*/
|
||||
private boolean isMajorCompaction(final List<StoreFile> filesToCompact)
|
||||
throws IOException {
|
||||
private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
|
||||
boolean result = false;
|
||||
if (filesToCompact == null || filesToCompact.isEmpty() ||
|
||||
majorCompactionTime == 0) {
|
||||
|
@ -758,9 +768,9 @@ public class Store implements HeapSize {
|
|||
* nothing made it through the compaction.
|
||||
* @throws IOException
|
||||
*/
|
||||
private HFile.Writer compact(final List<StoreFile> filesToCompact,
|
||||
final boolean majorCompaction, final long maxId)
|
||||
throws IOException {
|
||||
private StoreFile.Writer compact(final List<StoreFile> filesToCompact,
|
||||
final boolean majorCompaction, final long maxId)
|
||||
throws IOException {
|
||||
// calculate maximum key count after compaction (for blooms)
|
||||
int maxKeyCount = 0;
|
||||
for (StoreFile file : filesToCompact) {
|
||||
|
@ -850,8 +860,8 @@ public class Store implements HeapSize {
|
|||
* @throws IOException
|
||||
*/
|
||||
private StoreFile completeCompaction(final List<StoreFile> compactedFiles,
|
||||
final HFile.Writer compactedFile)
|
||||
throws IOException {
|
||||
final StoreFile.Writer compactedFile)
|
||||
throws IOException {
|
||||
// 1. Moving the new files into place -- if there is a new file (may not
|
||||
// be if all cells were expired or deleted).
|
||||
StoreFile result = null;
|
||||
|
@ -907,7 +917,7 @@ public class Store implements HeapSize {
|
|||
// 4. Compute new store size
|
||||
this.storeSize = 0L;
|
||||
for (StoreFile hsf : this.storefiles) {
|
||||
Reader r = hsf.getReader();
|
||||
StoreFile.Reader r = hsf.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("StoreFile " + hsf + " has a null Reader");
|
||||
continue;
|
||||
|
@ -970,8 +980,7 @@ public class Store implements HeapSize {
|
|||
* @return Found keyvalue or null if none found.
|
||||
* @throws IOException
|
||||
*/
|
||||
KeyValue getRowKeyAtOrBefore(final KeyValue kv)
|
||||
throws IOException {
|
||||
KeyValue getRowKeyAtOrBefore(final KeyValue kv) throws IOException {
|
||||
GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
|
||||
this.comparator, kv, this.ttl, this.region.getRegionInfo().isMetaRegion());
|
||||
this.lock.readLock().lock();
|
||||
|
@ -997,9 +1006,9 @@ public class Store implements HeapSize {
|
|||
* @throws IOException
|
||||
*/
|
||||
private void rowAtOrBeforeFromStoreFile(final StoreFile f,
|
||||
final GetClosestRowBeforeTracker state)
|
||||
throws IOException {
|
||||
Reader r = f.getReader();
|
||||
final GetClosestRowBeforeTracker state)
|
||||
throws IOException {
|
||||
StoreFile.Reader r = f.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("StoreFile " + f + " has a null Reader");
|
||||
return;
|
||||
|
@ -1049,8 +1058,9 @@ public class Store implements HeapSize {
|
|||
* @throws IOException
|
||||
*/
|
||||
private boolean seekToScanner(final HFileScanner scanner,
|
||||
final KeyValue firstOnRow, final KeyValue firstKV)
|
||||
throws IOException {
|
||||
final KeyValue firstOnRow,
|
||||
final KeyValue firstKV)
|
||||
throws IOException {
|
||||
KeyValue kv = firstOnRow;
|
||||
// If firstOnRow < firstKV, set to firstKV
|
||||
if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
|
||||
|
@ -1070,8 +1080,9 @@ public class Store implements HeapSize {
|
|||
* @throws IOException
|
||||
*/
|
||||
private boolean walkForwardInSingleRow(final HFileScanner scanner,
|
||||
final KeyValue firstOnRow, final GetClosestRowBeforeTracker state)
|
||||
throws IOException {
|
||||
final KeyValue firstOnRow,
|
||||
final GetClosestRowBeforeTracker state)
|
||||
throws IOException {
|
||||
boolean foundCandidate = false;
|
||||
do {
|
||||
KeyValue kv = scanner.getKeyValue();
|
||||
|
@ -1129,7 +1140,7 @@ public class Store implements HeapSize {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
Reader r = sf.getReader();
|
||||
StoreFile.Reader r = sf.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("Storefile " + sf + " Reader is null");
|
||||
continue;
|
||||
|
@ -1141,7 +1152,7 @@ public class Store implements HeapSize {
|
|||
largestSf = sf;
|
||||
}
|
||||
}
|
||||
HFile.Reader r = largestSf.getReader();
|
||||
StoreFile.Reader r = largestSf.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("Storefile " + largestSf + " Reader is null");
|
||||
return null;
|
||||
|
@ -1217,7 +1228,7 @@ public class Store implements HeapSize {
|
|||
long getStorefilesSize() {
|
||||
long size = 0;
|
||||
for (StoreFile s: storefiles) {
|
||||
Reader r = s.getReader();
|
||||
StoreFile.Reader r = s.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("StoreFile " + s + " has a null Reader");
|
||||
continue;
|
||||
|
@ -1233,7 +1244,7 @@ public class Store implements HeapSize {
|
|||
long getStorefilesIndexSize() {
|
||||
long size = 0;
|
||||
for (StoreFile s: storefiles) {
|
||||
Reader r = s.getReader();
|
||||
StoreFile.Reader r = s.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("StoreFile " + s + " has a null Reader");
|
||||
continue;
|
||||
|
@ -1284,7 +1295,7 @@ public class Store implements HeapSize {
|
|||
* @throws IOException
|
||||
*/
|
||||
static boolean getClosest(final HFileScanner s, final KeyValue kv)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
// Pass offsets to key content of a KeyValue; thats whats in the hfile index.
|
||||
int result = s.seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
|
||||
if (result < 0) {
|
||||
|
@ -1311,7 +1322,7 @@ public class Store implements HeapSize {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void get(Get get, NavigableSet<byte[]> columns, List<KeyValue> result)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
KeyComparator keyComparator = this.comparator.getRawComparator();
|
||||
|
||||
// Column matching and version enforcement
|
||||
|
@ -1333,7 +1344,7 @@ public class Store implements HeapSize {
|
|||
// Get storefiles for this store
|
||||
List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
|
||||
for (StoreFile sf : Iterables.reverse(this.storefiles)) {
|
||||
HFile.Reader r = sf.getReader();
|
||||
StoreFile.Reader r = sf.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("StoreFile " + sf + " has a null Reader");
|
||||
continue;
|
||||
|
@ -1367,8 +1378,8 @@ public class Store implements HeapSize {
|
|||
* @throws IOException
|
||||
*/
|
||||
public long updateColumnValue(byte [] row, byte [] f,
|
||||
byte [] qualifier, long newValue)
|
||||
throws IOException {
|
||||
byte [] qualifier, long newValue)
|
||||
throws IOException {
|
||||
List<KeyValue> result = new ArrayList<KeyValue>();
|
||||
KeyComparator keyComparator = this.comparator.getRawComparator();
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.util.BloomFilter;
|
|||
import org.apache.hadoop.hbase.util.ByteBloomFilter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Hash;
|
||||
import org.apache.hadoop.io.RawComparator;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
|
@ -56,8 +56,8 @@ import java.util.Collections;
|
|||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.SortedSet;
|
||||
import java.util.Random;
|
||||
import java.util.SortedSet;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -71,18 +71,53 @@ import java.util.regex.Pattern;
|
|||
* sitting in the Filesystem. To refer to it, create a StoreFile instance
|
||||
* passing filesystem and path. To read, call {@link #createReader()}.
|
||||
* <p>StoreFiles may also reference store files in another Store.
|
||||
*
|
||||
* The reason for this weird pattern where you use a different instance for the
|
||||
* writer and a reader is that we write once but read a lot more.
|
||||
*/
|
||||
public class StoreFile {
|
||||
static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
|
||||
|
||||
private static final String HFILE_CACHE_SIZE_KEY = "hfile.block.cache.size";
|
||||
// Config keys.
|
||||
static final String IO_STOREFILE_BLOOM_ERROR_RATE = "io.storefile.bloom.error.rate";
|
||||
static final String IO_STOREFILE_BLOOM_MAX_FOLD = "io.storefile.bloom.max.fold";
|
||||
static final String IO_STOREFILE_BLOOM_ENABLED = "io.storefile.bloom.enabled";
|
||||
static final String HFILE_BLOCK_CACHE_SIZE_KEY = "hfile.block.cache.size";
|
||||
|
||||
private static BlockCache hfileBlockCache = null;
|
||||
public static enum BloomType {
|
||||
/**
|
||||
* Bloomfilters disabled
|
||||
*/
|
||||
NONE,
|
||||
/**
|
||||
* Bloom enabled with Table row as Key
|
||||
*/
|
||||
ROW,
|
||||
/**
|
||||
* Bloom enabled with Table row & column (family+qualifier) as Key
|
||||
*/
|
||||
ROWCOL
|
||||
}
|
||||
// Keys for fileinfo values in HFile
|
||||
/** Max Sequence ID in FileInfo */
|
||||
public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
|
||||
/** Major compaction flag in FileInfo */
|
||||
public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
|
||||
/** Bloom filter Type in FileInfo */
|
||||
static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
|
||||
|
||||
/** Meta data block name for bloom filter meta-info (ie: bloom params/specs) */
|
||||
static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
|
||||
/** Meta data block name for bloom filter data (ie: bloom bits) */
|
||||
static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
|
||||
|
||||
// Make default block size for StoreFiles 8k while testing. TODO: FIX!
|
||||
// Need to make it 8k for testing.
|
||||
public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
|
||||
|
||||
|
||||
private static BlockCache hfileBlockCache = null;
|
||||
|
||||
private final FileSystem fs;
|
||||
// This file's path.
|
||||
private final Path path;
|
||||
|
@ -96,15 +131,9 @@ public class StoreFile {
|
|||
private boolean inMemory;
|
||||
|
||||
// Keys for metadata stored in backing HFile.
|
||||
/** Constant for the max sequence ID meta */
|
||||
public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
|
||||
// Set when we obtain a Reader.
|
||||
private long sequenceid = -1;
|
||||
|
||||
/** Constant for major compaction meta */
|
||||
public static final byte [] MAJOR_COMPACTION_KEY =
|
||||
Bytes.toBytes("MAJOR_COMPACTION_KEY");
|
||||
|
||||
// If true, this file was product of a major compaction. Its then set
|
||||
// whenever you get a Reader.
|
||||
private AtomicBoolean majorCompaction = null;
|
||||
|
@ -115,12 +144,6 @@ public class StoreFile {
|
|||
public static final byte[] BULKLOAD_TIME_KEY =
|
||||
Bytes.toBytes("BULKLOAD_TIMESTAMP");
|
||||
|
||||
|
||||
static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
|
||||
static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
|
||||
static final byte[] BLOOM_FILTER_TYPE_KEY =
|
||||
Bytes.toBytes("BLOOM_FILTER_TYPE");
|
||||
|
||||
/**
|
||||
* Map of the metadata entries in the corresponding HFile
|
||||
*/
|
||||
|
@ -134,7 +157,8 @@ public class StoreFile {
|
|||
private static final Pattern REF_NAME_PARSER =
|
||||
Pattern.compile("^(\\d+)(?:\\.(.+))?$");
|
||||
|
||||
private volatile StoreFile.Reader reader;
|
||||
// StoreFile.Reader
|
||||
private volatile Reader reader;
|
||||
|
||||
// Used making file ids.
|
||||
private final static Random rand = new Random();
|
||||
|
@ -153,9 +177,13 @@ public class StoreFile {
|
|||
* @param bt The bloom type to use for this store file
|
||||
* @throws IOException When opening the reader fails.
|
||||
*/
|
||||
StoreFile(final FileSystem fs, final Path p, final boolean blockcache,
|
||||
final Configuration conf, final BloomType bt, final boolean inMemory)
|
||||
throws IOException {
|
||||
StoreFile(final FileSystem fs,
|
||||
final Path p,
|
||||
final boolean blockcache,
|
||||
final Configuration conf,
|
||||
final BloomType bt,
|
||||
final boolean inMemory)
|
||||
throws IOException {
|
||||
this.conf = conf;
|
||||
this.fs = fs;
|
||||
this.path = p;
|
||||
|
@ -167,7 +195,7 @@ public class StoreFile {
|
|||
}
|
||||
// ignore if the column family config says "no bloom filter"
|
||||
// even if there is one in the hfile.
|
||||
if (conf.getBoolean("io.hfile.bloom.enabled", true)) {
|
||||
if (conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true)) {
|
||||
this.bloomType = bt;
|
||||
} else {
|
||||
this.bloomType = BloomType.NONE;
|
||||
|
@ -307,11 +335,11 @@ public class StoreFile {
|
|||
public static synchronized BlockCache getBlockCache(Configuration conf) {
|
||||
if (hfileBlockCache != null) return hfileBlockCache;
|
||||
|
||||
float cachePercentage = conf.getFloat(HFILE_CACHE_SIZE_KEY, 0.0f);
|
||||
float cachePercentage = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
|
||||
// There should be a better way to optimize this. But oh well.
|
||||
if (cachePercentage == 0L) return null;
|
||||
if (cachePercentage > 1.0) {
|
||||
throw new IllegalArgumentException(HFILE_CACHE_SIZE_KEY +
|
||||
throw new IllegalArgumentException(HFILE_BLOCK_CACHE_SIZE_KEY +
|
||||
" must be between 0.0 and 1.0, not > 1.0");
|
||||
}
|
||||
|
||||
|
@ -337,18 +365,20 @@ public class StoreFile {
|
|||
* @throws IOException
|
||||
* @see #closeReader()
|
||||
*/
|
||||
private StoreFile.Reader open()
|
||||
throws IOException {
|
||||
private Reader open() throws IOException {
|
||||
|
||||
if (this.reader != null) {
|
||||
throw new IllegalAccessError("Already open");
|
||||
}
|
||||
|
||||
if (isReference()) {
|
||||
this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
|
||||
getBlockCache(), this.reference);
|
||||
} else {
|
||||
this.reader = new StoreFile.Reader(this.fs, this.path, getBlockCache(),
|
||||
this.reader = new Reader(this.fs, this.path, getBlockCache(),
|
||||
this.inMemory);
|
||||
}
|
||||
|
||||
// Load up indices and fileinfo.
|
||||
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
|
||||
// Read in our metadata.
|
||||
|
@ -365,8 +395,8 @@ public class StoreFile {
|
|||
this.sequenceid += 1;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
b = metadataMap.get(MAJOR_COMPACTION_KEY);
|
||||
if (b != null) {
|
||||
boolean mc = Bytes.toBoolean(b);
|
||||
|
@ -388,7 +418,7 @@ public class StoreFile {
|
|||
* @return Reader for StoreFile. creates if necessary
|
||||
* @throws IOException
|
||||
*/
|
||||
public StoreFile.Reader createReader() throws IOException {
|
||||
public Reader createReader() throws IOException {
|
||||
if (this.reader == null) {
|
||||
this.reader = open();
|
||||
}
|
||||
|
@ -400,7 +430,7 @@ public class StoreFile {
|
|||
* @throws IOException
|
||||
* @see {@link #createReader()}
|
||||
*/
|
||||
public StoreFile.Reader getReader() {
|
||||
public Reader getReader() {
|
||||
return this.reader;
|
||||
}
|
||||
|
||||
|
@ -455,9 +485,11 @@ public class StoreFile {
|
|||
* @return True if succeeded.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Path rename(final FileSystem fs, final Path src,
|
||||
final Path tgt)
|
||||
throws IOException {
|
||||
public static Path rename(final FileSystem fs,
|
||||
final Path src,
|
||||
final Path tgt)
|
||||
throws IOException {
|
||||
|
||||
if (!fs.exists(src)) {
|
||||
throw new FileNotFoundException(src.toString());
|
||||
}
|
||||
|
@ -469,19 +501,20 @@ public class StoreFile {
|
|||
|
||||
/**
|
||||
* Get a store file writer. Client is responsible for closing file when done.
|
||||
* If metadata, add BEFORE closing using
|
||||
* {@link #appendMetadata(org.apache.hadoop.hbase.io.hfile.HFile.Writer, long)}.
|
||||
*
|
||||
* @param fs
|
||||
* @param dir Path to family directory. Makes the directory if doesn't exist.
|
||||
* Creates a file with a unique name in this directory.
|
||||
* @param blocksize size per filesystem block
|
||||
* @return HFile.Writer
|
||||
* @return StoreFile.Writer
|
||||
* @throws IOException
|
||||
*/
|
||||
public static StoreFile.Writer createWriter(final FileSystem fs, final Path dir,
|
||||
final int blocksize)
|
||||
throws IOException {
|
||||
return createWriter(fs,dir,blocksize,null,null,null,BloomType.NONE,0);
|
||||
public static Writer createWriter(final FileSystem fs,
|
||||
final Path dir,
|
||||
final int blocksize)
|
||||
throws IOException {
|
||||
|
||||
return createWriter(fs, dir, blocksize, null, null, null, BloomType.NONE, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -499,20 +532,25 @@ public class StoreFile {
|
|||
* @return HFile.Writer
|
||||
* @throws IOException
|
||||
*/
|
||||
public static StoreFile.Writer createWriter(final FileSystem fs, final Path dir,
|
||||
final int blocksize, final Compression.Algorithm algorithm,
|
||||
final KeyValue.KVComparator c, final Configuration conf,
|
||||
BloomType bloomType, int maxKeySize)
|
||||
throws IOException {
|
||||
public static StoreFile.Writer createWriter(final FileSystem fs,
|
||||
final Path dir,
|
||||
final int blocksize,
|
||||
final Compression.Algorithm algorithm,
|
||||
final KeyValue.KVComparator c,
|
||||
final Configuration conf,
|
||||
BloomType bloomType,
|
||||
int maxKeySize)
|
||||
throws IOException {
|
||||
|
||||
if (!fs.exists(dir)) {
|
||||
fs.mkdirs(dir);
|
||||
}
|
||||
Path path = getUniqueFile(fs, dir);
|
||||
if(conf == null || !conf.getBoolean("io.hfile.bloom.enabled", true)) {
|
||||
if(conf == null || !conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true)) {
|
||||
bloomType = BloomType.NONE;
|
||||
}
|
||||
|
||||
return new StoreFile.Writer(fs, path, blocksize,
|
||||
return new Writer(fs, path, blocksize,
|
||||
algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
|
||||
conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize);
|
||||
}
|
||||
|
@ -523,7 +561,7 @@ public class StoreFile {
|
|||
* @return random filename inside passed <code>dir</code>
|
||||
*/
|
||||
public static Path getUniqueFile(final FileSystem fs, final Path dir)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
if (!fs.getFileStatus(dir).isDir()) {
|
||||
throw new IOException("Expecting " + dir.toString() +
|
||||
" to be a directory");
|
||||
|
@ -539,7 +577,7 @@ public class StoreFile {
|
|||
* @throws IOException
|
||||
*/
|
||||
static Path getRandomFilename(final FileSystem fs, final Path dir)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
return getRandomFilename(fs, dir, null);
|
||||
}
|
||||
|
||||
|
@ -551,9 +589,10 @@ public class StoreFile {
|
|||
* @return Path to a file that doesn't exist at time of this invocation.
|
||||
* @throws IOException
|
||||
*/
|
||||
static Path getRandomFilename(final FileSystem fs, final Path dir,
|
||||
final String suffix)
|
||||
throws IOException {
|
||||
static Path getRandomFilename(final FileSystem fs,
|
||||
final Path dir,
|
||||
final String suffix)
|
||||
throws IOException {
|
||||
long id = -1;
|
||||
Path p = null;
|
||||
do {
|
||||
|
@ -564,8 +603,11 @@ public class StoreFile {
|
|||
return p;
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Write out a split reference.
|
||||
*
|
||||
* Package local so it doesnt leak out of regionserver.
|
||||
*
|
||||
* @param fs
|
||||
* @param splitDir Presumes path format is actually
|
||||
* <code>SOME_DIRECTORY/REGIONNAME/FAMILY</code>.
|
||||
|
@ -575,9 +617,12 @@ public class StoreFile {
|
|||
* @return Path to created reference.
|
||||
* @throws IOException
|
||||
*/
|
||||
static Path split(final FileSystem fs, final Path splitDir,
|
||||
final StoreFile f, final byte [] splitRow, final Reference.Range range)
|
||||
throws IOException {
|
||||
static Path split(final FileSystem fs,
|
||||
final Path splitDir,
|
||||
final StoreFile f,
|
||||
final byte [] splitRow,
|
||||
final Reference.Range range)
|
||||
throws IOException {
|
||||
// A reference to the bottom half of the hsf store file.
|
||||
Reference r = new Reference(splitRow, range);
|
||||
// Add the referred-to regions name as a dot separated suffix.
|
||||
|
@ -591,164 +636,19 @@ public class StoreFile {
|
|||
return r.write(fs, p);
|
||||
}
|
||||
|
||||
public static enum BloomType {
|
||||
/**
|
||||
* Bloomfilters disabled
|
||||
*/
|
||||
NONE,
|
||||
/**
|
||||
* Bloom enabled with Table row as Key
|
||||
*/
|
||||
ROW,
|
||||
/**
|
||||
* Bloom enabled with Table row & column (family+qualifier) as Key
|
||||
*/
|
||||
ROWCOL
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
|
||||
* local because it is an implementation detail of the HBase regionserver.
|
||||
*/
|
||||
public static class Reader extends HFile.Reader {
|
||||
/** Bloom Filter class. Caches only meta, pass in data */
|
||||
protected BloomFilter bloomFilter = null;
|
||||
/** Type of bloom filter (e.g. ROW vs ROWCOL) */
|
||||
protected BloomType bloomFilterType;
|
||||
|
||||
public Reader(FileSystem fs, Path path, BlockCache cache,
|
||||
boolean inMemory)
|
||||
throws IOException {
|
||||
super(fs, path, cache, inMemory);
|
||||
}
|
||||
|
||||
public Reader(final FSDataInputStream fsdis, final long size,
|
||||
final BlockCache cache, final boolean inMemory) {
|
||||
super(fsdis,size,cache,inMemory);
|
||||
bloomFilterType = BloomType.NONE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<byte [], byte []> loadFileInfo()
|
||||
throws IOException {
|
||||
Map<byte [], byte []> fi = super.loadFileInfo();
|
||||
|
||||
byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
|
||||
if (b != null) {
|
||||
bloomFilterType = BloomType.valueOf(Bytes.toString(b));
|
||||
}
|
||||
|
||||
return fi;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the bloom filter for this HFile into memory.
|
||||
* Assumes the HFile has already been loaded
|
||||
*/
|
||||
public void loadBloomfilter() {
|
||||
if (this.bloomFilter != null) {
|
||||
return; // already loaded
|
||||
}
|
||||
|
||||
// see if bloom filter information is in the metadata
|
||||
try {
|
||||
ByteBuffer b = getMetaBlock(BLOOM_FILTER_META_KEY, false);
|
||||
if (b != null) {
|
||||
if (bloomFilterType == BloomType.NONE) {
|
||||
throw new IOException("valid bloom filter type not found in FileInfo");
|
||||
}
|
||||
this.bloomFilter = new ByteBloomFilter(b);
|
||||
LOG.info("Loaded " + (bloomFilterType==BloomType.ROW? "row":"col")
|
||||
+ " bloom filter metadata for " + name);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error reading bloom filter meta -- proceeding without", e);
|
||||
this.bloomFilter = null;
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.error("Bad bloom filter meta -- proceeding without", e);
|
||||
this.bloomFilter = null;
|
||||
}
|
||||
}
|
||||
|
||||
BloomFilter getBloomFilter() {
|
||||
return this.bloomFilter;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bloom type information associated with this store file
|
||||
*/
|
||||
public BloomType getBloomFilterType() {
|
||||
return this.bloomFilterType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFilterEntries() {
|
||||
return (this.bloomFilter != null) ? this.bloomFilter.getKeyCount()
|
||||
: super.getFilterEntries();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
|
||||
return new Scanner(this, cacheBlocks, pread);
|
||||
}
|
||||
|
||||
protected class Scanner extends HFile.Reader.Scanner {
|
||||
public Scanner(Reader r, boolean cacheBlocks, final boolean pread) {
|
||||
super(r, cacheBlocks, pread);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldSeek(final byte[] row,
|
||||
final SortedSet<byte[]> columns) {
|
||||
if (bloomFilter == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
byte[] key;
|
||||
switch(bloomFilterType) {
|
||||
case ROW:
|
||||
key = row;
|
||||
break;
|
||||
case ROWCOL:
|
||||
if (columns.size() == 1) {
|
||||
byte[] col = columns.first();
|
||||
key = Bytes.add(row, col);
|
||||
break;
|
||||
}
|
||||
//$FALL-THROUGH$
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
|
||||
try {
|
||||
ByteBuffer bloom = getMetaBlock(BLOOM_FILTER_DATA_KEY, true);
|
||||
if (bloom != null) {
|
||||
return bloomFilter.contains(key, bloom);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error reading bloom filter data -- proceeding without",
|
||||
e);
|
||||
bloomFilter = null;
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.error("Bad bloom filter data -- proceeding without", e);
|
||||
bloomFilter = null;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public static class Writer extends HFile.Writer {
|
||||
public static class Writer {
|
||||
private final BloomFilter bloomFilter;
|
||||
private final BloomType bloomType;
|
||||
private KVComparator kvComparator;
|
||||
private KeyValue lastKv = null;
|
||||
private byte[] lastByteArray = null;
|
||||
|
||||
protected HFile.Writer writer;
|
||||
/**
|
||||
* Creates an HFile.Writer that also write helpful meta data.
|
||||
* @param fs file system to write to
|
||||
|
@ -764,14 +664,14 @@ public class StoreFile {
|
|||
public Writer(FileSystem fs, Path path, int blocksize,
|
||||
Compression.Algorithm compress, final Configuration conf,
|
||||
final KVComparator comparator, BloomType bloomType, int maxKeys)
|
||||
throws IOException {
|
||||
super(fs, path, blocksize, compress, comparator.getRawComparator());
|
||||
throws IOException {
|
||||
writer = new HFile.Writer(fs, path, blocksize, compress, comparator.getRawComparator());
|
||||
|
||||
this.kvComparator = comparator;
|
||||
|
||||
if (bloomType != BloomType.NONE && conf != null) {
|
||||
float err = conf.getFloat("io.hfile.bloom.error.rate", (float)0.01);
|
||||
int maxFold = conf.getInt("io.hfile.bloom.max.fold", 7);
|
||||
float err = conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float)0.01);
|
||||
int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD, 7);
|
||||
|
||||
this.bloomFilter = new ByteBloomFilter(maxKeys, err,
|
||||
Hash.getHashType(conf), maxFold);
|
||||
|
@ -790,16 +690,13 @@ public class StoreFile {
|
|||
* @param majorCompaction True if this file is product of a major compaction
|
||||
* @throws IOException problem writing to FS
|
||||
*/
|
||||
public void appendMetadata(final long maxSequenceId,
|
||||
final boolean majorCompaction)
|
||||
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
|
||||
throws IOException {
|
||||
appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
|
||||
appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
|
||||
writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
|
||||
writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(final KeyValue kv)
|
||||
throws IOException {
|
||||
public void append(final KeyValue kv) throws IOException {
|
||||
if (this.bloomFilter != null) {
|
||||
// only add to the bloom filter on a new, unique key
|
||||
boolean newKey = true;
|
||||
|
@ -846,28 +743,28 @@ public class StoreFile {
|
|||
this.lastKv = kv;
|
||||
}
|
||||
}
|
||||
super.append(kv);
|
||||
writer.append(kv);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(final byte [] key, final byte [] value)
|
||||
throws IOException {
|
||||
public Path getPath() {
|
||||
return this.writer.getPath();
|
||||
}
|
||||
|
||||
public void append(final byte [] key, final byte [] value) throws IOException {
|
||||
if (this.bloomFilter != null) {
|
||||
// only add to the bloom filter on a new row
|
||||
if(this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) {
|
||||
if (this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) {
|
||||
this.bloomFilter.add(key);
|
||||
this.lastByteArray = key;
|
||||
}
|
||||
}
|
||||
super.append(key, value);
|
||||
writer.append(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
throws IOException {
|
||||
public void close() throws IOException {
|
||||
// make sure we wrote something to the bloom before adding it
|
||||
if (this.bloomFilter != null && this.bloomFilter.getKeyCount() > 0) {
|
||||
bloomFilter.finalize();
|
||||
bloomFilter.compactBloom();
|
||||
if (this.bloomFilter.getMaxKeys() > 0) {
|
||||
int b = this.bloomFilter.getByteSize();
|
||||
int k = this.bloomFilter.getKeyCount();
|
||||
|
@ -876,13 +773,184 @@ public class StoreFile {
|
|||
k + "/" + m + " (" + NumberFormat.getPercentInstance().format(
|
||||
((double)k) / ((double)m)) + ")");
|
||||
}
|
||||
appendMetaBlock(BLOOM_FILTER_META_KEY, bloomFilter.getMetaWriter());
|
||||
appendMetaBlock(BLOOM_FILTER_DATA_KEY, bloomFilter.getDataWriter());
|
||||
appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
|
||||
writer.appendMetaBlock(BLOOM_FILTER_META_KEY, bloomFilter.getMetaWriter());
|
||||
writer.appendMetaBlock(BLOOM_FILTER_DATA_KEY, bloomFilter.getDataWriter());
|
||||
writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
|
||||
}
|
||||
super.close();
|
||||
writer.close();
|
||||
}
|
||||
|
||||
public void appendFileInfo(byte[] key, byte[] value) throws IOException {
|
||||
writer.appendFileInfo(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reader for a StoreFile.
|
||||
*/
|
||||
public static class Reader {
|
||||
static final Log LOG = LogFactory.getLog(Reader.class.getName());
|
||||
|
||||
protected BloomFilter bloomFilter = null;
|
||||
protected BloomType bloomFilterType;
|
||||
private final HFile.Reader reader;
|
||||
|
||||
public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory)
|
||||
throws IOException {
|
||||
reader = new HFile.Reader(fs, path, blockCache, inMemory);
|
||||
bloomFilterType = BloomType.NONE;
|
||||
}
|
||||
|
||||
public RawComparator<byte []> getComparator() {
|
||||
return reader.getComparator();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a scanner to scan over this StoreFile.
|
||||
*
|
||||
* @param cacheBlocks should this scanner cache blocks?
|
||||
* @param pread use pread (for highly concurrent small readers)
|
||||
* @return a scanner
|
||||
*/
|
||||
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) {
|
||||
return new StoreFileScanner(this, getScanner(cacheBlocks, pread));
|
||||
}
|
||||
|
||||
/**
|
||||
* Warning: Do not write further code which depends on this call. Instead
|
||||
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
|
||||
* which is the preferred way to scan a store with higher level concepts.
|
||||
*
|
||||
* @param cacheBlocks should we cache the blocks?
|
||||
* @param pread use pread (for concurrent small readers)
|
||||
* @return the underlying HFileScanner
|
||||
*/
|
||||
@Deprecated
|
||||
public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
|
||||
return reader.getScanner(cacheBlocks, pread);
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
reader.close();
|
||||
}
|
||||
|
||||
public boolean shouldSeek(final byte[] row,
|
||||
final SortedSet<byte[]> columns) {
|
||||
|
||||
if (this.bloomFilter == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
byte[] key;
|
||||
switch (this.bloomFilterType) {
|
||||
case ROW:
|
||||
key = row;
|
||||
break;
|
||||
case ROWCOL:
|
||||
if (columns.size() == 1) {
|
||||
byte[] col = columns.first();
|
||||
key = Bytes.add(row, col);
|
||||
break;
|
||||
}
|
||||
//$FALL-THROUGH$
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
|
||||
try {
|
||||
ByteBuffer bloom = reader.getMetaBlock(BLOOM_FILTER_DATA_KEY, true);
|
||||
if (bloom != null) {
|
||||
return this.bloomFilter.contains(key, bloom);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error reading bloom filter data -- proceeding without",
|
||||
e);
|
||||
setBloomFilterFaulty();
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.error("Bad bloom filter data -- proceeding without", e);
|
||||
setBloomFilterFaulty();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public Map<byte[], byte[]> loadFileInfo() throws IOException {
|
||||
Map<byte [], byte []> fi = reader.loadFileInfo();
|
||||
|
||||
byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
|
||||
if (b != null) {
|
||||
bloomFilterType = BloomType.valueOf(Bytes.toString(b));
|
||||
}
|
||||
|
||||
return fi;
|
||||
}
|
||||
|
||||
public void loadBloomfilter() {
|
||||
if (this.bloomFilter != null) {
|
||||
return; // already loaded
|
||||
}
|
||||
|
||||
try {
|
||||
ByteBuffer b = reader.getMetaBlock(BLOOM_FILTER_META_KEY, false);
|
||||
if (b != null) {
|
||||
if (bloomFilterType == BloomType.NONE) {
|
||||
throw new IOException("valid bloom filter type not found in FileInfo");
|
||||
}
|
||||
|
||||
|
||||
this.bloomFilter = new ByteBloomFilter(b);
|
||||
LOG.info("Loaded " + (bloomFilterType== BloomType.ROW? "row":"col")
|
||||
+ " bloom filter metadata for " + reader.getName());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error reading bloom filter meta -- proceeding without", e);
|
||||
this.bloomFilter = null;
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.error("Bad bloom filter meta -- proceeding without", e);
|
||||
this.bloomFilter = null;
|
||||
}
|
||||
}
|
||||
|
||||
public int getFilterEntries() {
|
||||
return (this.bloomFilter != null) ? this.bloomFilter.getKeyCount()
|
||||
: reader.getFilterEntries();
|
||||
}
|
||||
|
||||
public ByteBuffer getMetaBlock(String bloomFilterDataKey, boolean cacheBlock) throws IOException {
|
||||
return reader.getMetaBlock(bloomFilterDataKey, cacheBlock);
|
||||
}
|
||||
|
||||
public void setBloomFilterFaulty() {
|
||||
bloomFilter = null;
|
||||
}
|
||||
|
||||
public byte[] getLastKey() {
|
||||
return reader.getLastKey();
|
||||
}
|
||||
|
||||
public byte[] midkey() throws IOException {
|
||||
return reader.midkey();
|
||||
}
|
||||
|
||||
public long length() {
|
||||
return reader.length();
|
||||
}
|
||||
|
||||
public int getEntries() {
|
||||
return reader.getEntries();
|
||||
}
|
||||
|
||||
public byte[] getFirstKey() {
|
||||
return reader.getFirstKey();
|
||||
}
|
||||
|
||||
public long indexSize() {
|
||||
return reader.indexSize();
|
||||
}
|
||||
|
||||
public BloomType getBloomFilterType() {
|
||||
return this.bloomFilterType;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -24,28 +24,31 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
|
||||
/**
|
||||
* A KeyValue scanner that iterates over a single HFile
|
||||
* KeyValueScanner adaptor over the Reader. It also provides hooks into
|
||||
* bloom filter things.
|
||||
*/
|
||||
class StoreFileScanner implements KeyValueScanner {
|
||||
static final Log LOG = LogFactory.getLog(Store.class);
|
||||
|
||||
private HFileScanner hfs;
|
||||
// the reader it comes from:
|
||||
private final StoreFile.Reader reader;
|
||||
private final HFileScanner hfs;
|
||||
private KeyValue cur = null;
|
||||
|
||||
/**
|
||||
* Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
|
||||
* @param hfs HFile scanner
|
||||
*/
|
||||
private StoreFileScanner(HFileScanner hfs) {
|
||||
public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs) {
|
||||
this.reader = reader;
|
||||
this.hfs = hfs;
|
||||
}
|
||||
|
||||
|
@ -60,16 +63,12 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
List<StoreFileScanner> scanners =
|
||||
new ArrayList<StoreFileScanner>(filesToCompact.size());
|
||||
for (StoreFile file : filesToCompact) {
|
||||
Reader r = file.createReader();
|
||||
scanners.add(new StoreFileScanner(r.getScanner(cacheBlocks, usePread)));
|
||||
StoreFile.Reader r = file.createReader();
|
||||
scanners.add(r.getStoreFileScanner(cacheBlocks, usePread));
|
||||
}
|
||||
return scanners;
|
||||
}
|
||||
|
||||
public HFileScanner getHFileScanner() {
|
||||
return this.hfs;
|
||||
}
|
||||
|
||||
|
||||
public String toString() {
|
||||
return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
|
||||
}
|
||||
|
@ -131,4 +130,10 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
// Seeked to the exact key
|
||||
return true;
|
||||
}
|
||||
|
||||
// Bloom filter hook.
|
||||
public boolean shouldSeek(final byte[] row,
|
||||
final SortedSet<byte[]> columns) {
|
||||
return reader.shouldSeek(row, columns);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,9 +155,9 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
|
||||
|
||||
// exclude scan files that have failed file filters
|
||||
for(StoreFileScanner sfs : sfScanners) {
|
||||
for (StoreFileScanner sfs : sfScanners) {
|
||||
if (isGet &&
|
||||
!sfs.getHFileScanner().shouldSeek(scan.getStartRow(), columns)) {
|
||||
!sfs.shouldSeek(scan.getStartRow(), columns)) {
|
||||
continue; // exclude this hfs
|
||||
}
|
||||
scanners.add(sfs);
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -19,38 +19,37 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Defines the general behavior of a bloom filter.
|
||||
* <p>
|
||||
* The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by
|
||||
* The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by
|
||||
* the networking research community in the past decade thanks to the bandwidth efficiencies that it
|
||||
* offers for the transmission of set membership information between networked hosts. A sender encodes
|
||||
* the information into a bit vector, the Bloom filter, that is more compact than a conventional
|
||||
* representation. Computation and space costs for construction are linear in the number of elements.
|
||||
* The receiver uses the filter to test whether various elements are members of the set. Though the
|
||||
* filter will occasionally return a false positive, it will never return a false negative. When creating
|
||||
* the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size.
|
||||
*
|
||||
* offers for the transmission of set membership information between networked hosts. A sender encodes
|
||||
* the information into a bit vector, the Bloom filter, that is more compact than a conventional
|
||||
* representation. Computation and space costs for construction are linear in the number of elements.
|
||||
* The receiver uses the filter to test whether various elements are members of the set. Though the
|
||||
* filter will occasionally return a false positive, it will never return a false negative. When creating
|
||||
* the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size.
|
||||
*
|
||||
* <p>
|
||||
* Originally created by
|
||||
* <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
|
||||
*
|
||||
*
|
||||
* <p>
|
||||
* It must be extended in order to define the real behavior.
|
||||
*/
|
||||
public interface BloomFilter {
|
||||
/**
|
||||
* Allocate memory for the bloom filter data. Note that bloom data isn't
|
||||
* allocated by default because it can grow large & reads would be better
|
||||
/**
|
||||
* Allocate memory for the bloom filter data. Note that bloom data isn't
|
||||
* allocated by default because it can grow large & reads would be better
|
||||
* managed by the LRU cache.
|
||||
*/
|
||||
void allocBloom();
|
||||
|
||||
|
||||
/**
|
||||
* Add the specified binary to the bloom filter.
|
||||
*
|
||||
|
@ -86,7 +85,7 @@ public interface BloomFilter {
|
|||
* @return true if matched by bloom, false if not
|
||||
*/
|
||||
boolean contains(byte [] buf, int offset, int length, ByteBuffer bloom);
|
||||
|
||||
|
||||
/**
|
||||
* @return The number of keys added to the bloom
|
||||
*/
|
||||
|
@ -97,16 +96,16 @@ public interface BloomFilter {
|
|||
* to maintain the desired error rate
|
||||
*/
|
||||
public int getMaxKeys();
|
||||
|
||||
|
||||
/**
|
||||
* Size of the bloom, in bytes
|
||||
* @return Size of the bloom, in bytes
|
||||
*/
|
||||
public int getByteSize();
|
||||
|
||||
|
||||
/**
|
||||
* Finalize the bloom before writing metadata & data to disk
|
||||
* Compact the bloom before writing metadata & data to disk
|
||||
*/
|
||||
void finalize();
|
||||
void compactBloom();
|
||||
|
||||
/**
|
||||
* Get a writable interface into bloom filter meta data.
|
||||
|
|
|
@ -1,35 +1,6 @@
|
|||
/**
|
||||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
|
||||
* All rights reserved.
|
||||
* Redistribution and use in source and binary forms, with or
|
||||
* without modification, are permitted provided that the following
|
||||
* conditions are met:
|
||||
* - Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* - Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in
|
||||
* the documentation and/or other materials provided with the distribution.
|
||||
* - Neither the name of the University Catholique de Louvain - UCL
|
||||
* nor the names of its contributors may be used to endorse or
|
||||
* promote products derived from this software without specific prior
|
||||
* written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
||||
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
|
||||
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||||
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
||||
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
||||
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
||||
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -48,41 +19,38 @@
|
|||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.io.DataOutput;
|
||||
import java.io.DataInput;
|
||||
import java.io.IOException;
|
||||
import java.lang.Math;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.util.bloom.Filter;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Implements a <i>Bloom filter</i>, as defined by Bloom in 1970.
|
||||
* <p>
|
||||
* The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by
|
||||
* The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by
|
||||
* the networking research community in the past decade thanks to the bandwidth efficiencies that it
|
||||
* offers for the transmission of set membership information between networked hosts. A sender encodes
|
||||
* the information into a bit vector, the Bloom filter, that is more compact than a conventional
|
||||
* representation. Computation and space costs for construction are linear in the number of elements.
|
||||
* The receiver uses the filter to test whether various elements are members of the set. Though the
|
||||
* filter will occasionally return a false positive, it will never return a false negative. When creating
|
||||
* the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size.
|
||||
*
|
||||
* offers for the transmission of set membership information between networked hosts. A sender encodes
|
||||
* the information into a bit vector, the Bloom filter, that is more compact than a conventional
|
||||
* representation. Computation and space costs for construction are linear in the number of elements.
|
||||
* The receiver uses the filter to test whether various elements are members of the set. Though the
|
||||
* filter will occasionally return a false positive, it will never return a false negative. When creating
|
||||
* the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size.
|
||||
*
|
||||
* <p>
|
||||
* Originally created by
|
||||
* Originally inspired by
|
||||
* <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
|
||||
*
|
||||
*
|
||||
* @see BloomFilter The general behavior of a filter
|
||||
*
|
||||
*
|
||||
* @see <a href="http://portal.acm.org/citation.cfm?id=362692&dl=ACM&coll=portal">Space/Time Trade-Offs in Hash Coding with Allowable Errors</a>
|
||||
*/
|
||||
public class ByteBloomFilter implements BloomFilter {
|
||||
/** Current file format version */
|
||||
public static final int VERSION = 1;
|
||||
|
||||
|
||||
/** Bytes (B) in the array */
|
||||
protected int byteSize;
|
||||
/** Number of hash functions */
|
||||
|
@ -97,7 +65,7 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
protected int maxKeys;
|
||||
/** Bloom bits */
|
||||
protected ByteBuffer bloom;
|
||||
|
||||
|
||||
/** Bit-value lookup array to prevent doing the same work over and over */
|
||||
private static final byte [] bitvals = {
|
||||
(byte) 0x01,
|
||||
|
@ -115,11 +83,11 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
* @param meta stored bloom meta data
|
||||
* @throws IllegalArgumentException meta data is invalid
|
||||
*/
|
||||
public ByteBloomFilter(ByteBuffer meta)
|
||||
throws IllegalArgumentException {
|
||||
public ByteBloomFilter(ByteBuffer meta)
|
||||
throws IllegalArgumentException {
|
||||
int version = meta.getInt();
|
||||
if (version != VERSION) throw new IllegalArgumentException("Bad version");
|
||||
|
||||
|
||||
this.byteSize = meta.getInt();
|
||||
this.hashCount = meta.getInt();
|
||||
this.hashType = meta.getInt();
|
||||
|
@ -136,30 +104,30 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
* @param maxKeys Maximum expected number of keys that will be stored in this bloom
|
||||
* @param errorRate Desired false positive error rate. Lower rate = more storage required
|
||||
* @param hashType Type of hash function to use
|
||||
* @param foldFactor When finished adding entries, you may be able to 'fold'
|
||||
* this bloom to save space. Tradeoff potentially excess bytes in bloom for
|
||||
* @param foldFactor When finished adding entries, you may be able to 'fold'
|
||||
* this bloom to save space. Tradeoff potentially excess bytes in bloom for
|
||||
* ability to fold if keyCount is exponentially greater than maxKeys.
|
||||
* @throws IllegalArgumentException
|
||||
*/
|
||||
public ByteBloomFilter(int maxKeys, float errorRate, int hashType, int foldFactor)
|
||||
throws IllegalArgumentException {
|
||||
/*
|
||||
* Bloom filters are very sensitive to the number of elements inserted
|
||||
* into them. For HBase, the number of entries depends on the size of the
|
||||
* data stored in the column. Currently the default region size is 256MB,
|
||||
* so entry count ~= 256MB / (average value size for column). Despite
|
||||
* this rule of thumb, there is no efficient way to calculate the entry
|
||||
* count after compactions. Therefore, it is often easier to use a
|
||||
throws IllegalArgumentException {
|
||||
/*
|
||||
* Bloom filters are very sensitive to the number of elements inserted
|
||||
* into them. For HBase, the number of entries depends on the size of the
|
||||
* data stored in the column. Currently the default region size is 256MB,
|
||||
* so entry count ~= 256MB / (average value size for column). Despite
|
||||
* this rule of thumb, there is no efficient way to calculate the entry
|
||||
* count after compactions. Therefore, it is often easier to use a
|
||||
* dynamic bloom filter that will add extra space instead of allowing the
|
||||
* error rate to grow.
|
||||
*
|
||||
*
|
||||
* ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf )
|
||||
*
|
||||
* m denotes the number of bits in the Bloom filter (bitSize)
|
||||
* n denotes the number of elements inserted into the Bloom filter (maxKeys)
|
||||
* k represents the number of hash functions used (nbHash)
|
||||
* e represents the desired false positive rate for the bloom (err)
|
||||
*
|
||||
*
|
||||
* If we fix the error rate (e) and know the number of entries, then
|
||||
* the optimal bloom size m = -(n * ln(err) / (ln(2)^2)
|
||||
* ~= n * ln(err) / ln(0.6185)
|
||||
|
@ -196,12 +164,12 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
this.bloom = ByteBuffer.allocate(this.byteSize);
|
||||
assert this.bloom.hasArray();
|
||||
}
|
||||
|
||||
|
||||
void sanityCheck() throws IllegalArgumentException {
|
||||
if(this.byteSize <= 0) {
|
||||
throw new IllegalArgumentException("maxValue must be > 0");
|
||||
}
|
||||
|
||||
|
||||
if(this.hashCount <= 0) {
|
||||
throw new IllegalArgumentException("Hash function count must be > 0");
|
||||
}
|
||||
|
@ -209,12 +177,12 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
if (this.hash == null) {
|
||||
throw new IllegalArgumentException("hashType must be known");
|
||||
}
|
||||
|
||||
|
||||
if (this.keyCount < 0) {
|
||||
throw new IllegalArgumentException("must have positive keyCount");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void bloomCheck(ByteBuffer bloom) throws IllegalArgumentException {
|
||||
if (this.byteSize != bloom.limit()) {
|
||||
throw new IllegalArgumentException(
|
||||
|
@ -243,14 +211,14 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
|
||||
++this.keyCount;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Should only be used in tests when writing a bloom filter.
|
||||
*/
|
||||
boolean contains(byte [] buf) {
|
||||
return contains(buf, 0, buf.length, this.bloom);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Should only be used in tests when writing a bloom filter.
|
||||
*/
|
||||
|
@ -264,7 +232,7 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(byte [] buf, int offset, int length,
|
||||
public boolean contains(byte [] buf, int offset, int length,
|
||||
ByteBuffer theBloom) {
|
||||
|
||||
if(theBloom.limit() != this.byteSize) {
|
||||
|
@ -282,11 +250,11 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
//---------------------------------------------------------------------------
|
||||
/** Private helpers */
|
||||
|
||||
/**
|
||||
|
||||
/**
|
||||
* Set the bit at the specified index to 1.
|
||||
*
|
||||
* @param pos index of bit
|
||||
|
@ -298,7 +266,7 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
curByte |= bitvals[bitPos];
|
||||
bloom.put(bytePos, curByte);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check if bit at specified index is 1.
|
||||
*
|
||||
|
@ -312,37 +280,37 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
curByte &= bitvals[bitPos];
|
||||
return (curByte != 0);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getKeyCount() {
|
||||
return this.keyCount;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getMaxKeys() {
|
||||
return this.maxKeys;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getByteSize() {
|
||||
return this.byteSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalize() {
|
||||
public void compactBloom() {
|
||||
// see if the actual size is exponentially smaller than expected.
|
||||
if (this.keyCount > 0 && this.bloom.hasArray()) {
|
||||
int pieces = 1;
|
||||
int newByteSize = this.byteSize;
|
||||
int newMaxKeys = this.maxKeys;
|
||||
|
||||
|
||||
// while exponentially smaller & folding is lossless
|
||||
while ( (newByteSize & 1) == 0 && newMaxKeys > (this.keyCount<<1) ) {
|
||||
pieces <<= 1;
|
||||
newByteSize >>= 1;
|
||||
newMaxKeys >>= 1;
|
||||
}
|
||||
|
||||
|
||||
// if we should fold these into pieces
|
||||
if (pieces > 1) {
|
||||
byte[] array = this.bloom.array();
|
||||
|
@ -351,7 +319,7 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
int off = end;
|
||||
for(int p = 1; p < pieces; ++p) {
|
||||
for(int pos = start; pos < end; ++pos) {
|
||||
array[pos] |= array[off++];
|
||||
array[pos] |= array[off++];
|
||||
}
|
||||
}
|
||||
// folding done, only use a subset of this array
|
||||
|
@ -388,9 +356,9 @@ public class ByteBloomFilter implements BloomFilter {
|
|||
public Writable getDataWriter() {
|
||||
return new DataWriter();
|
||||
}
|
||||
|
||||
|
||||
private class MetaWriter implements Writable {
|
||||
protected MetaWriter() {}
|
||||
protected MetaWriter() {}
|
||||
@Override
|
||||
public void readFields(DataInput arg0) throws IOException {
|
||||
throw new IOException("Cant read with this class.");
|
||||
|
|
|
@ -1,35 +1,6 @@
|
|||
/**
|
||||
/*
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
|
||||
* All rights reserved.
|
||||
* Redistribution and use in source and binary forms, with or
|
||||
* without modification, are permitted provided that the following
|
||||
* conditions are met:
|
||||
* - Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* - Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in
|
||||
* the documentation and/or other materials provided with the distribution.
|
||||
* - Neither the name of the University Catholique de Louvain - UCL
|
||||
* nor the names of its contributors may be used to endorse or
|
||||
* promote products derived from this software without specific prior
|
||||
* written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
||||
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
|
||||
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||||
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
||||
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
||||
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
||||
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -49,30 +20,30 @@
|
|||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* Implements a <i>dynamic Bloom filter</i>, as defined in the INFOCOM 2006 paper.
|
||||
* <p>
|
||||
* A dynamic Bloom filter (DBF) makes use of a <code>s * m</code> bit matrix but
|
||||
* each of the <code>s</code> rows is a standard Bloom filter. The creation
|
||||
* each of the <code>s</code> rows is a standard Bloom filter. The creation
|
||||
* process of a DBF is iterative. At the start, the DBF is a <code>1 * m</code>
|
||||
* bit matrix, i.e., it is composed of a single standard Bloom filter.
|
||||
* It assumes that <code>n<sub>r</sub></code> elements are recorded in the
|
||||
* It assumes that <code>n<sub>r</sub></code> elements are recorded in the
|
||||
* initial bit vector, where <code>n<sub>r</sub> <= n</code> (<code>n</code> is
|
||||
* the cardinality of the set <code>A</code> to record in the filter).
|
||||
* the cardinality of the set <code>A</code> to record in the filter).
|
||||
* <p>
|
||||
* As the size of <code>A</code> grows during the execution of the application,
|
||||
* several keys must be inserted in the DBF. When inserting a key into the DBF,
|
||||
* one must first get an active Bloom filter in the matrix. A Bloom filter is
|
||||
* active when the number of recorded keys, <code>n<sub>r</sub></code>, is
|
||||
* active when the number of recorded keys, <code>n<sub>r</sub></code>, is
|
||||
* strictly less than the current cardinality of <code>A</code>, <code>n</code>.
|
||||
* If an active Bloom filter is found, the key is inserted and
|
||||
* If an active Bloom filter is found, the key is inserted and
|
||||
* <code>n<sub>r</sub></code> is incremented by one. On the other hand, if there
|
||||
* is no active Bloom filter, a new one is created (i.e., a new row is added to
|
||||
* the matrix) according to the current size of <code>A</code> and the element
|
||||
|
@ -84,7 +55,7 @@ import org.apache.hadoop.io.Writable;
|
|||
* <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
|
||||
*
|
||||
* @see BloomFilter A Bloom filter
|
||||
*
|
||||
*
|
||||
* @see <a href="http://www.cse.fau.edu/~jie/research/publications/Publication_files/infocom2006.pdf">Theory and Network Applications of Dynamic Bloom Filters</a>
|
||||
*/
|
||||
public class DynamicByteBloomFilter implements BloomFilter {
|
||||
|
@ -108,8 +79,7 @@ public class DynamicByteBloomFilter implements BloomFilter {
|
|||
* @param meta stored bloom meta data
|
||||
* @throws IllegalArgumentException meta data is invalid
|
||||
*/
|
||||
public DynamicByteBloomFilter(ByteBuffer meta)
|
||||
throws IllegalArgumentException {
|
||||
public DynamicByteBloomFilter(ByteBuffer meta) throws IllegalArgumentException {
|
||||
int version = meta.getInt();
|
||||
if (version != VERSION) throw new IllegalArgumentException("Bad version");
|
||||
|
||||
|
@ -118,7 +88,7 @@ public class DynamicByteBloomFilter implements BloomFilter {
|
|||
this.hashType = meta.getInt();
|
||||
this.readMatrixSize = meta.getInt();
|
||||
this.curKeys = meta.getInt();
|
||||
|
||||
|
||||
readSanityCheck();
|
||||
|
||||
this.matrix = new ByteBloomFilter[1];
|
||||
|
@ -126,12 +96,9 @@ public class DynamicByteBloomFilter implements BloomFilter {
|
|||
}
|
||||
|
||||
/**
|
||||
* Normal write constructor. Note that this doesn't allocate bloom data by
|
||||
* Normal write constructor. Note that this doesn't allocate bloom data by
|
||||
* default. Instead, call allocBloom() before adding entries.
|
||||
* @param bitSize The vector size of <i>this</i> filter.
|
||||
* @param functionCount The number of hash function to consider.
|
||||
* @param hashType type of the hashing function (see
|
||||
* {@link org.apache.hadoop.util.hash.Hash}).
|
||||
* @param hashType type of the hashing function (see {@link org.apache.hadoop.util.hash.Hash}).
|
||||
* @param keyInterval Maximum number of keys to record per Bloom filter row.
|
||||
* @throws IllegalArgumentException The input parameters were invalid
|
||||
*/
|
||||
|
@ -141,7 +108,7 @@ public class DynamicByteBloomFilter implements BloomFilter {
|
|||
this.errorRate = errorRate;
|
||||
this.hashType = hashType;
|
||||
this.curKeys = 0;
|
||||
|
||||
|
||||
if(keyInterval <= 0) {
|
||||
throw new IllegalArgumentException("keyCount must be > 0");
|
||||
}
|
||||
|
@ -164,7 +131,7 @@ public class DynamicByteBloomFilter implements BloomFilter {
|
|||
throw new IllegalArgumentException("matrix size must be known");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void add(byte []buf, int offset, int len) {
|
||||
BloomFilter bf = getCurBloom();
|
||||
|
@ -207,17 +174,17 @@ public class DynamicByteBloomFilter implements BloomFilter {
|
|||
public boolean contains(byte [] buf, ByteBuffer theBloom) {
|
||||
return contains(buf, 0, buf.length, theBloom);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean contains(byte[] buf, int offset, int length,
|
||||
public boolean contains(byte[] buf, int offset, int length,
|
||||
ByteBuffer theBloom) {
|
||||
if(offset + length > buf.length) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
// current version assumes uniform size
|
||||
int bytesPerBloom = this.matrix[0].getByteSize();
|
||||
|
||||
int bytesPerBloom = this.matrix[0].getByteSize();
|
||||
|
||||
if(theBloom.limit() != bytesPerBloom * readMatrixSize) {
|
||||
throw new IllegalArgumentException("Bloom does not match expected size");
|
||||
}
|
||||
|
@ -233,7 +200,7 @@ public class DynamicByteBloomFilter implements BloomFilter {
|
|||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// matched no bloom filters
|
||||
return false;
|
||||
}
|
||||
|
@ -251,14 +218,14 @@ public class DynamicByteBloomFilter implements BloomFilter {
|
|||
public int getMaxKeys() {
|
||||
return bloomCount() * this.keyInterval;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getByteSize() {
|
||||
return bloomCount() * this.matrix[0].getByteSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalize() {
|
||||
public void compactBloom() {
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -298,9 +265,9 @@ public class DynamicByteBloomFilter implements BloomFilter {
|
|||
public Writable getDataWriter() {
|
||||
return new DataWriter();
|
||||
}
|
||||
|
||||
|
||||
private class MetaWriter implements Writable {
|
||||
protected MetaWriter() {}
|
||||
protected MetaWriter() {}
|
||||
@Override
|
||||
public void readFields(DataInput arg0) throws IOException {
|
||||
throw new IOException("Cant read with this class.");
|
||||
|
|
|
@ -57,9 +57,9 @@ import org.junit.Test;
|
|||
*/
|
||||
public class TestFSErrorsExposed {
|
||||
private static final Log LOG = LogFactory.getLog(TestFSErrorsExposed.class);
|
||||
|
||||
|
||||
HBaseTestingUtility util = new HBaseTestingUtility();
|
||||
|
||||
|
||||
/**
|
||||
* Injects errors into the pread calls of an on-disk file, and makes
|
||||
* sure those bubble up to the HFile scanner
|
||||
|
@ -73,21 +73,21 @@ public class TestFSErrorsExposed {
|
|||
StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2*1024);
|
||||
TestStoreFile.writeStoreFile(
|
||||
writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
|
||||
|
||||
|
||||
StoreFile sf = new StoreFile(fs, writer.getPath(), false,
|
||||
util.getConfiguration(), StoreFile.BloomType.NONE, false);
|
||||
HFile.Reader reader = sf.createReader();
|
||||
StoreFile.Reader reader = sf.createReader();
|
||||
HFileScanner scanner = reader.getScanner(false, true);
|
||||
|
||||
|
||||
FaultyInputStream inStream = fs.inStreams.get(0).get();
|
||||
assertNotNull(inStream);
|
||||
|
||||
|
||||
scanner.seekTo();
|
||||
// Do at least one successful read
|
||||
assertTrue(scanner.next());
|
||||
|
||||
|
||||
inStream.startFaults();
|
||||
|
||||
|
||||
try {
|
||||
int scanned=0;
|
||||
while (scanner.next()) {
|
||||
|
@ -100,7 +100,7 @@ public class TestFSErrorsExposed {
|
|||
}
|
||||
reader.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Injects errors into the pread calls of an on-disk file, and makes
|
||||
* sure those bubble up to the StoreFileScanner
|
||||
|
@ -111,25 +111,25 @@ public class TestFSErrorsExposed {
|
|||
HBaseTestingUtility.getTestDir("internalScannerExposesErrors"),
|
||||
"regionname"), "familyname");
|
||||
FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem());
|
||||
HFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2 * 1024);
|
||||
StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2 * 1024);
|
||||
TestStoreFile.writeStoreFile(
|
||||
writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
|
||||
|
||||
|
||||
StoreFile sf = new StoreFile(fs, writer.getPath(), false,
|
||||
util.getConfiguration(), BloomType.NONE, false);
|
||||
List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
|
||||
Collections.singletonList(sf), false, true);
|
||||
KeyValueScanner scanner = scanners.get(0);
|
||||
|
||||
|
||||
FaultyInputStream inStream = fs.inStreams.get(0).get();
|
||||
assertNotNull(inStream);
|
||||
|
||||
|
||||
scanner.seek(KeyValue.LOWESTKEY);
|
||||
// Do at least one successful read
|
||||
assertNotNull(scanner.next());
|
||||
|
||||
|
||||
inStream.startFaults();
|
||||
|
||||
|
||||
try {
|
||||
int scanned=0;
|
||||
while (scanner.next() != null) {
|
||||
|
@ -142,7 +142,7 @@ public class TestFSErrorsExposed {
|
|||
}
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Cluster test which starts a region server with a region, then
|
||||
* removes the data from HDFS underneath it, and ensures that
|
||||
|
@ -154,25 +154,25 @@ public class TestFSErrorsExposed {
|
|||
util.startMiniCluster(1);
|
||||
byte[] tableName = Bytes.toBytes("table");
|
||||
byte[] fam = Bytes.toBytes("fam");
|
||||
|
||||
|
||||
HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addFamily(new HColumnDescriptor(
|
||||
fam, 1, HColumnDescriptor.DEFAULT_COMPRESSION,
|
||||
false, false, HConstants.FOREVER, "NONE"));
|
||||
admin.createTable(desc);
|
||||
|
||||
|
||||
HTable table = new HTable(tableName);
|
||||
|
||||
|
||||
// Load some data
|
||||
util.loadTable(table, fam);
|
||||
table.flushCommits();
|
||||
util.flush();
|
||||
util.countRows(table);
|
||||
|
||||
|
||||
// Kill the DFS cluster
|
||||
util.getDFSCluster().shutdownDataNodes();
|
||||
|
||||
|
||||
try {
|
||||
util.countRows(table);
|
||||
fail("Did not fail to count after removing data");
|
||||
|
@ -180,16 +180,16 @@ public class TestFSErrorsExposed {
|
|||
LOG.info("Got expected error", e);
|
||||
assertTrue(e.getMessage().contains("Could not seek"));
|
||||
}
|
||||
|
||||
|
||||
} finally {
|
||||
util.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class FaultyFileSystem extends FilterFileSystem {
|
||||
List<SoftReference<FaultyInputStream>> inStreams =
|
||||
new ArrayList<SoftReference<FaultyInputStream>>();
|
||||
|
||||
|
||||
public FaultyFileSystem(FileSystem testFileSystem) {
|
||||
super(testFileSystem);
|
||||
}
|
||||
|
@ -202,16 +202,16 @@ public class TestFSErrorsExposed {
|
|||
return faulty;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class FaultyInputStream extends FSDataInputStream {
|
||||
boolean faultsStarted = false;
|
||||
|
||||
|
||||
public FaultyInputStream(InputStream in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
public void startFaults() {
|
||||
faultsStarted = true;
|
||||
faultsStarted = true;
|
||||
}
|
||||
|
||||
public int read(long position, byte[] buffer, int offset, int length)
|
||||
|
@ -219,7 +219,7 @@ public class TestFSErrorsExposed {
|
|||
injectFault();
|
||||
return ((PositionedReadable)in).read(position, buffer, offset, length);
|
||||
}
|
||||
|
||||
|
||||
private void injectFault() throws IOException {
|
||||
if (faultsStarted) {
|
||||
throw new IOException("Fault injected");
|
||||
|
@ -227,5 +227,5 @@ public class TestFSErrorsExposed {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -82,14 +82,14 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
*/
|
||||
public void testBasicHalfMapFile() throws Exception {
|
||||
// Make up a directory hierarchy that has a regiondir and familyname.
|
||||
HFile.Writer writer = StoreFile.createWriter(this.fs,
|
||||
StoreFile.Writer writer = StoreFile.createWriter(this.fs,
|
||||
new Path(new Path(this.testDir, "regionname"), "familyname"), 2 * 1024);
|
||||
writeStoreFile(writer);
|
||||
checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf,
|
||||
checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf,
|
||||
StoreFile.BloomType.NONE, false));
|
||||
}
|
||||
|
||||
private void writeStoreFile(final HFile.Writer writer) throws IOException {
|
||||
private void writeStoreFile(final StoreFile.Writer writer) throws IOException {
|
||||
writeStoreFile(writer, Bytes.toBytes(getName()), Bytes.toBytes(getName()));
|
||||
}
|
||||
/*
|
||||
|
@ -98,7 +98,7 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
* @param writer
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void writeStoreFile(final HFile.Writer writer, byte[] fam, byte[] qualifier)
|
||||
public static void writeStoreFile(final StoreFile.Writer writer, byte[] fam, byte[] qualifier)
|
||||
throws IOException {
|
||||
long now = System.currentTimeMillis();
|
||||
try {
|
||||
|
@ -123,11 +123,11 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
Path storedir = new Path(new Path(this.testDir, "regionname"), "familyname");
|
||||
Path dir = new Path(storedir, "1234567890");
|
||||
// Make a store file and write data to it.
|
||||
HFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024);
|
||||
StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024);
|
||||
writeStoreFile(writer);
|
||||
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
|
||||
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
|
||||
StoreFile.BloomType.NONE, false);
|
||||
HFile.Reader reader = hsf.createReader();
|
||||
StoreFile.Reader reader = hsf.createReader();
|
||||
// Split on a row, not in middle of row. Midkey returned by reader
|
||||
// may be in middle of row. Create new one with empty column and
|
||||
// timestamp.
|
||||
|
@ -137,7 +137,7 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
byte [] finalRow = kv.getRow();
|
||||
// Make a reference
|
||||
Path refPath = StoreFile.split(fs, dir, hsf, midRow, Range.top);
|
||||
StoreFile refHsf = new StoreFile(this.fs, refPath, true, conf,
|
||||
StoreFile refHsf = new StoreFile(this.fs, refPath, true, conf,
|
||||
StoreFile.BloomType.NONE, false);
|
||||
// Now confirm that I can read from the reference and that it only gets
|
||||
// keys from top half of the file.
|
||||
|
@ -174,9 +174,9 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
Path bottomPath = StoreFile.split(this.fs, bottomDir,
|
||||
f, midRow, Range.bottom);
|
||||
// Make readers on top and bottom.
|
||||
HFile.Reader top = new StoreFile(this.fs, topPath, true, conf,
|
||||
StoreFile.Reader top = new StoreFile(this.fs, topPath, true, conf,
|
||||
StoreFile.BloomType.NONE, false).createReader();
|
||||
HFile.Reader bottom = new StoreFile(this.fs, bottomPath, true, conf,
|
||||
StoreFile.Reader bottom = new StoreFile(this.fs, bottomPath, true, conf,
|
||||
StoreFile.BloomType.NONE, false).createReader();
|
||||
ByteBuffer previous = null;
|
||||
LOG.info("Midkey: " + midKV.toString());
|
||||
|
@ -229,9 +229,9 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top);
|
||||
bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey,
|
||||
Range.bottom);
|
||||
top = new StoreFile(this.fs, topPath, true, conf,
|
||||
top = new StoreFile(this.fs, topPath, true, conf,
|
||||
StoreFile.BloomType.NONE, false).createReader();
|
||||
bottom = new StoreFile(this.fs, bottomPath, true, conf,
|
||||
bottom = new StoreFile(this.fs, bottomPath, true, conf,
|
||||
StoreFile.BloomType.NONE, false).createReader();
|
||||
bottomScanner = bottom.getScanner(false, false);
|
||||
int count = 0;
|
||||
|
@ -250,7 +250,6 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
assertTrue(topScanner.getReader().getComparator().compare(key.array(),
|
||||
key.arrayOffset(), key.limit(), badmidkey, 0, badmidkey.length) >= 0);
|
||||
if (first) {
|
||||
first = false;
|
||||
first = false;
|
||||
KeyValue keyKV = KeyValue.createKeyValueFromKey(key);
|
||||
LOG.info("First top when key < bottom: " + keyKV);
|
||||
|
@ -275,9 +274,9 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top);
|
||||
bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey,
|
||||
Range.bottom);
|
||||
top = new StoreFile(this.fs, topPath, true, conf,
|
||||
top = new StoreFile(this.fs, topPath, true, conf,
|
||||
StoreFile.BloomType.NONE, false).createReader();
|
||||
bottom = new StoreFile(this.fs, bottomPath, true, conf,
|
||||
bottom = new StoreFile(this.fs, bottomPath, true, conf,
|
||||
StoreFile.BloomType.NONE, false).createReader();
|
||||
first = true;
|
||||
bottomScanner = bottom.getScanner(false, false);
|
||||
|
@ -317,44 +316,44 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
fs.delete(f.getPath(), true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static String ROOT_DIR =
|
||||
HBaseTestingUtility.getTestDir("TestStoreFile").toString();
|
||||
private static String localFormatter = "%010d";
|
||||
|
||||
|
||||
public void testBloomFilter() throws Exception {
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
conf.setFloat("io.hfile.bloom.error.rate", (float)0.01);
|
||||
conf.setBoolean("io.hfile.bloom.enabled", true);
|
||||
|
||||
|
||||
// write the file
|
||||
Path f = new Path(ROOT_DIR, getName());
|
||||
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
|
||||
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
|
||||
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
|
||||
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
|
||||
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000);
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
for (int i = 0; i < 2000; i += 2) {
|
||||
String row = String.format(localFormatter, Integer.valueOf(i));
|
||||
String row = String.format(localFormatter, i);
|
||||
KeyValue kv = new KeyValue(row.getBytes(), "family".getBytes(),
|
||||
"col".getBytes(), now, "value".getBytes());
|
||||
writer.append(kv);
|
||||
}
|
||||
writer.close();
|
||||
|
||||
|
||||
StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false);
|
||||
reader.loadFileInfo();
|
||||
reader.loadBloomfilter();
|
||||
HFileScanner scanner = reader.getScanner(false, false);
|
||||
StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
|
||||
|
||||
// check false positives rate
|
||||
int falsePos = 0;
|
||||
int falseNeg = 0;
|
||||
for (int i = 0; i < 2000; i++) {
|
||||
String row = String.format(localFormatter, Integer.valueOf(i));
|
||||
String row = String.format(localFormatter, i);
|
||||
TreeSet<byte[]> columns = new TreeSet<byte[]>();
|
||||
columns.add("family:col".getBytes());
|
||||
|
||||
|
||||
boolean exists = scanner.shouldSeek(row.getBytes(), columns);
|
||||
if (i % 2 == 0) {
|
||||
if (!exists) falseNeg++;
|
||||
|
@ -369,19 +368,19 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
System.out.println("False positives: " + falsePos);
|
||||
assertTrue(falsePos < 2);
|
||||
}
|
||||
|
||||
|
||||
public void testBloomTypes() throws Exception {
|
||||
float err = (float) 0.01;
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
conf.setFloat("io.hfile.bloom.error.rate", err);
|
||||
conf.setBoolean("io.hfile.bloom.enabled", true);
|
||||
|
||||
|
||||
int rowCount = 50;
|
||||
int colCount = 10;
|
||||
int versions = 2;
|
||||
|
||||
|
||||
// run once using columns and once using rows
|
||||
StoreFile.BloomType[] bt =
|
||||
StoreFile.BloomType[] bt =
|
||||
{StoreFile.BloomType.ROWCOL, StoreFile.BloomType.ROW};
|
||||
int[] expKeys = {rowCount*colCount, rowCount};
|
||||
// below line deserves commentary. it is expected bloom false positives
|
||||
|
@ -393,19 +392,19 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
for (int x : new int[]{0,1}) {
|
||||
// write the file
|
||||
Path f = new Path(ROOT_DIR, getName());
|
||||
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
|
||||
StoreFile.DEFAULT_BLOCKSIZE_SMALL,
|
||||
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
|
||||
StoreFile.DEFAULT_BLOCKSIZE_SMALL,
|
||||
HFile.DEFAULT_COMPRESSION_ALGORITHM,
|
||||
conf, KeyValue.COMPARATOR, bt[x], expKeys[x]);
|
||||
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
for (int i = 0; i < rowCount*2; i += 2) { // rows
|
||||
for (int j = 0; j < colCount*2; j += 2) { // column qualifiers
|
||||
String row = String.format(localFormatter, Integer.valueOf(i));
|
||||
String col = String.format(localFormatter, Integer.valueOf(j));
|
||||
for (int k= 0; k < versions; ++k) { // versions
|
||||
KeyValue kv = new KeyValue(row.getBytes(),
|
||||
"family".getBytes(), ("col" + col).getBytes(),
|
||||
String row = String.format(localFormatter, i);
|
||||
String col = String.format(localFormatter, j);
|
||||
for (int k= 0; k < versions; ++k) { // versions
|
||||
KeyValue kv = new KeyValue(row.getBytes(),
|
||||
"family".getBytes(), ("col" + col).getBytes(),
|
||||
now-k, Bytes.toBytes((long)-1));
|
||||
writer.append(kv);
|
||||
}
|
||||
|
@ -416,16 +415,16 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false);
|
||||
reader.loadFileInfo();
|
||||
reader.loadBloomfilter();
|
||||
HFileScanner scanner = reader.getScanner(false, false);
|
||||
assertEquals(expKeys[x], reader.getBloomFilter().getKeyCount());
|
||||
|
||||
StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
|
||||
assertEquals(expKeys[x], reader.bloomFilter.getKeyCount());
|
||||
|
||||
// check false positives rate
|
||||
int falsePos = 0;
|
||||
int falseNeg = 0;
|
||||
for (int i = 0; i < rowCount*2; ++i) { // rows
|
||||
for (int j = 0; j < colCount*2; ++j) { // column qualifiers
|
||||
String row = String.format(localFormatter, Integer.valueOf(i));
|
||||
String col = String.format(localFormatter, Integer.valueOf(j));
|
||||
String row = String.format(localFormatter, i);
|
||||
String col = String.format(localFormatter, j);
|
||||
TreeSet<byte[]> columns = new TreeSet<byte[]>();
|
||||
columns.add(("col" + col).getBytes());
|
||||
|
||||
|
@ -448,7 +447,6 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
assertEquals(0, falseNeg);
|
||||
assertTrue(falsePos < 2*expErr[x]);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testFlushTimeComparator() {
|
||||
|
|
Loading…
Reference in New Issue