HBASE-1200 Add bloomfilters

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@946464 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-05-19 22:19:24 +00:00
parent 8d2b283083
commit a8806266b8
28 changed files with 883 additions and 271 deletions

2
.gitignore vendored
View File

@ -12,3 +12,5 @@
/core/build/
/core/test/
*.iml
*.orig
*~

View File

@ -653,6 +653,7 @@ Release 0.21.0 - Unreleased
HBASE-2529 Make OldLogsCleaner easier to extend
HBASE-2527 Add the ability to easily extend some HLog actions
HBASE-2559 Set hbase.hregion.majorcompaction to 0 to disable
HBASE-1200 Add bloomfilters (Nicolas Spiegelberg via Stack)
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite

View File

@ -29,6 +29,8 @@ import java.util.Map;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
@ -50,7 +52,8 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
// Version 5 was when bloom filter descriptors were removed.
// Version 6 adds metadata as a map where keys and values are byte[].
// Version 7 -- add new compression and hfile blocksize to HColumnDescriptor (HBASE-1217)
private static final byte COLUMN_DESCRIPTOR_VERSION = (byte)7;
// Version 8 -- reintroduction of bloom filters, changed from boolean to enum
private static final byte COLUMN_DESCRIPTOR_VERSION = (byte)8;
/**
* The type of compression.
@ -113,7 +116,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
/**
* Default setting for whether or not to use bloomfilters.
*/
public static final boolean DEFAULT_BLOOMFILTER = false;
public static final String DEFAULT_BLOOMFILTER = StoreFile.BloomType.NONE.toString();
/**
* Default time to live of cell contents.
@ -166,7 +169,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
this (familyName == null || familyName.length <= 0?
HConstants.EMPTY_BYTE_ARRAY: familyName, DEFAULT_VERSIONS,
DEFAULT_COMPRESSION, DEFAULT_IN_MEMORY, DEFAULT_BLOCKCACHE,
DEFAULT_TTL, false);
DEFAULT_TTL, DEFAULT_BLOOMFILTER);
}
/**
@ -195,7 +198,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
* @param blockCacheEnabled If true, MapFile blocks should be cached
* @param timeToLive Time-to-live of cell contents, in seconds
* (use HConstants.FOREVER for unlimited TTL)
* @param bloomFilter Enable the specified bloom filter for this column
* @param bloomFilter Bloom filter type for this column
*
* @throws IllegalArgumentException if passed a family name that is made of
* other than 'word' characters: i.e. <code>[a-zA-Z_0-9]</code> or contains
@ -205,7 +208,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
public HColumnDescriptor(final byte [] familyName, final int maxVersions,
final String compression, final boolean inMemory,
final boolean blockCacheEnabled,
final int timeToLive, final boolean bloomFilter) {
final int timeToLive, final String bloomFilter) {
this(familyName, maxVersions, compression, inMemory, blockCacheEnabled,
DEFAULT_BLOCKSIZE, timeToLive, bloomFilter, DEFAULT_REPLICATION_SCOPE);
}
@ -222,7 +225,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
* @param blocksize
* @param timeToLive Time-to-live of cell contents, in seconds
* (use HConstants.FOREVER for unlimited TTL)
* @param bloomFilter Enable the specified bloom filter for this column
* @param bloomFilter Bloom filter type for this column
* @param scope The scope tag for this column
*
* @throws IllegalArgumentException if passed a family name that is made of
@ -233,7 +236,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
public HColumnDescriptor(final byte [] familyName, final int maxVersions,
final String compression, final boolean inMemory,
final boolean blockCacheEnabled, final int blocksize,
final int timeToLive, final boolean bloomFilter, final int scope) {
final int timeToLive, final String bloomFilter, final int scope) {
isLegalFamilyName(familyName);
this.name = familyName;
@ -248,7 +251,8 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
setTimeToLive(timeToLive);
setCompressionType(Compression.Algorithm.
valueOf(compression.toUpperCase()));
setBloomfilter(bloomFilter);
setBloomFilterType(StoreFile.BloomType.
valueOf(bloomFilter.toUpperCase()));
setBlocksize(blocksize);
setScope(scope);
}
@ -464,20 +468,21 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
}
/**
* @return true if a bloom filter is enabled
* @return bloom filter type used for new StoreFiles in ColumnFamily
*/
public boolean isBloomfilter() {
String value = getValue(BLOOMFILTER);
if (value != null)
return Boolean.valueOf(value).booleanValue();
return DEFAULT_BLOOMFILTER;
public StoreFile.BloomType getBloomFilterType() {
String n = getValue(BLOOMFILTER);
if (n == null) {
n = DEFAULT_BLOOMFILTER;
}
return StoreFile.BloomType.valueOf(n.toUpperCase());
}
/**
* @param onOff Enable/Disable bloom filter
* @param toggle bloom filter type
*/
public void setBloomfilter(final boolean onOff) {
setValue(BLOOMFILTER, Boolean.toString(onOff));
public void setBloomFilterType(final StoreFile.BloomType bt) {
setValue(BLOOMFILTER, bt.toString());
}
/**
@ -513,10 +518,6 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
values.entrySet()) {
String key = Bytes.toString(e.getKey().get());
String value = Bytes.toString(e.getValue().get());
if (key != null && key.toUpperCase().equals(BLOOMFILTER)) {
// Don't emit bloomfilter. Its not working.
continue;
}
s.append(", ");
s.append(key);
s.append(" => '");
@ -576,8 +577,8 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
int ordinal = in.readInt();
setCompressionType(Compression.Algorithm.values()[ordinal]);
setInMemory(in.readBoolean());
setBloomfilter(in.readBoolean());
if (isBloomfilter() && version < 5) {
setBloomFilterType(in.readBoolean() ? BloomType.ROW : BloomType.NONE);
if (getBloomFilterType() != BloomType.NONE && version < 5) {
// If a bloomFilter is enabled and the column descriptor is less than
// version 5, we need to skip over it to read the rest of the column
// descriptor. There are no BloomFilterDescriptors written to disk for
@ -593,7 +594,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
setTimeToLive(in.readInt());
}
} else {
// version 7+
// version 6+
this.name = Bytes.readByteArray(in);
this.values.clear();
int numValues = in.readInt();
@ -602,6 +603,15 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
ImmutableBytesWritable value = new ImmutableBytesWritable();
key.readFields(in);
value.readFields(in);
// in version 8, the BloomFilter setting changed from bool to enum
if (version < 8 && Bytes.toString(key.get()).equals(BLOOMFILTER)) {
value.set(Bytes.toBytes(
Boolean.getBoolean(Bytes.toString(value.get()))
? BloomType.ROW.toString()
: BloomType.NONE.toString()));
}
values.put(key, value);
}
if (version == 6) {

View File

@ -33,6 +33,7 @@ import java.util.TreeMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;
@ -667,7 +668,8 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
new HColumnDescriptor[] { new HColumnDescriptor(HConstants.CATALOG_FAMILY,
10, // Ten is arbitrary number. Keep versions to help debuggging.
Compression.Algorithm.NONE.getName(), true, true, 8 * 1024,
HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL) });
HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
HConstants.REPLICATION_SCOPE_LOCAL) });
/** Table descriptor for <code>.META.</code> catalog table */
public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor(
@ -675,9 +677,11 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
new HColumnDescriptor(HConstants.CATALOG_FAMILY,
10, // Ten is arbitrary number. Keep versions to help debuggging.
Compression.Algorithm.NONE.getName(), true, true, 8 * 1024,
HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL),
HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
HConstants.REPLICATION_SCOPE_LOCAL),
new HColumnDescriptor(HConstants.CATALOG_HISTORIAN_FAMILY,
HConstants.ALL_VERSIONS, Compression.Algorithm.NONE.getName(),
false, false, 8 * 1024,
HConstants.WEEK_IN_SECONDS, false, HConstants.REPLICATION_SCOPE_LOCAL)});
HConstants.WEEK_IN_SECONDS,StoreFile.BloomType.NONE.toString(),
HConstants.REPLICATION_SCOPE_LOCAL)});
}

View File

@ -945,7 +945,7 @@ public class KeyValue implements Writable, HeapSize {
System.arraycopy(this.bytes, o, result, 0, l);
return result;
}
//---------------------------------------------------------------------------
//
// KeyValue splitter
@ -1371,7 +1371,7 @@ public class KeyValue implements Writable, HeapSize {
}
/**
* Compares the row and column of two keyvalues
* Compares the row and column of two keyvalues for equality
* @param left
* @param right
* @return True if same row and column.
@ -1380,10 +1380,10 @@ public class KeyValue implements Writable, HeapSize {
final KeyValue right) {
short lrowlength = left.getRowLength();
short rrowlength = right.getRowLength();
if (!matchingRows(left, lrowlength, right, rrowlength)) {
return false;
}
return compareColumns(left, lrowlength, right, rrowlength) == 0;
// TsOffset = end of column data. just comparing Row+CF length of each
return left.getTimestampOffset() == right.getTimestampOffset() &&
matchingRows(left, lrowlength, right, rrowlength) &&
compareColumns(left, lrowlength, right, rrowlength) == 0;
}
/**
@ -1396,6 +1396,7 @@ public class KeyValue implements Writable, HeapSize {
}
/**
* Compares the row of two keyvalues for equality
* @param left
* @param right
* @return True if rows match.
@ -1415,11 +1416,8 @@ public class KeyValue implements Writable, HeapSize {
*/
public boolean matchingRows(final KeyValue left, final short lrowlength,
final KeyValue right, final short rrowlength) {
int compare = compareRows(left, lrowlength, right, rrowlength);
if (compare != 0) {
return false;
}
return true;
return lrowlength == rrowlength &&
compareRows(left, lrowlength, right, rrowlength) == 0;
}
public boolean matchingRows(final byte [] left, final int loffset,

View File

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@ -45,6 +46,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
@ -55,6 +57,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
@ -209,7 +212,7 @@ public class HFile {
private long valuelength = 0;
// Used to ensure we write in order.
private final RawComparator<byte []> comparator;
private final RawComparator<byte []> rawComparator;
// A stream made per block written.
private DataOutputStream out;
@ -239,7 +242,7 @@ public class HFile {
// Meta block system.
private ArrayList<byte []> metaNames = new ArrayList<byte []>();
private ArrayList<byte []> metaData = new ArrayList<byte[]>();
private ArrayList<Writable> metaData = new ArrayList<Writable>();
// Used compression. Used even if no compression -- 'none'.
private final Compression.Algorithm compressAlgo;
@ -273,7 +276,7 @@ public class HFile {
* @throws IOException
*/
public Writer(FileSystem fs, Path path, int blocksize,
String compress, final RawComparator<byte []> comparator)
String compress, final KeyComparator comparator)
throws IOException {
this(fs, path, blocksize,
compress == null? DEFAULT_COMPRESSION_ALGORITHM:
@ -292,7 +295,7 @@ public class HFile {
*/
public Writer(FileSystem fs, Path path, int blocksize,
Compression.Algorithm compress,
final RawComparator<byte []> comparator)
final KeyComparator comparator)
throws IOException {
this(fs.create(path), blocksize, compress, comparator);
this.closeOutputStream = true;
@ -309,7 +312,7 @@ public class HFile {
* @throws IOException
*/
public Writer(final FSDataOutputStream ostream, final int blocksize,
final String compress, final RawComparator<byte []> c)
final String compress, final KeyComparator c)
throws IOException {
this(ostream, blocksize,
Compression.getCompressionAlgorithmByName(compress), c);
@ -324,12 +327,12 @@ public class HFile {
* @throws IOException
*/
public Writer(final FSDataOutputStream ostream, final int blocksize,
final Compression.Algorithm compress, final RawComparator<byte []> c)
final Compression.Algorithm compress, final KeyComparator c)
throws IOException {
this.outputStream = ostream;
this.closeOutputStream = false;
this.blocksize = blocksize;
this.comparator = c == null? Bytes.BYTES_RAWCOMPARATOR: c;
this.rawComparator = c == null? Bytes.BYTES_RAWCOMPARATOR: c;
this.name = this.outputStream.toString();
this.compressAlgo = compress == null?
DEFAULT_COMPRESSION_ALGORITHM: compress;
@ -423,11 +426,21 @@ public class HFile {
* small, consider adding to file info using
* {@link #appendFileInfo(byte[], byte[])}
* @param metaBlockName name of the block
* @param bytes uninterpreted bytes of the block.
* @param content will call readFields to get data later (DO NOT REUSE)
*/
public void appendMetaBlock(String metaBlockName, byte [] bytes) {
metaNames.add(Bytes.toBytes(metaBlockName));
metaData.add(bytes);
public void appendMetaBlock(String metaBlockName, Writable content) {
byte[] key = Bytes.toBytes(metaBlockName);
int i;
for (i = 0; i < metaNames.size(); ++i) {
// stop when the current key is greater than our own
byte[] cur = metaNames.get(i);
if (this.rawComparator.compare(cur, 0, cur.length, key, 0, key.length)
> 0) {
break;
}
}
metaNames.add(i, key);
metaData.add(i, content);
}
/**
@ -508,7 +521,7 @@ public class HFile {
* @param vlength
* @throws IOException
*/
public void append(final byte [] key, final int koffset, final int klength,
private void append(final byte [] key, final int koffset, final int klength,
final byte [] value, final int voffset, final int vlength)
throws IOException {
boolean dupKey = checkKey(key, koffset, klength);
@ -552,7 +565,7 @@ public class HFile {
MAXIMUM_KEY_LENGTH);
}
if (this.lastKeyBuffer != null) {
int keyComp = this.comparator.compare(this.lastKeyBuffer, this.lastKeyOffset,
int keyComp = this.rawComparator.compare(this.lastKeyBuffer, this.lastKeyOffset,
this.lastKeyLength, key, offset, length);
if (keyComp > 0) {
throw new IOException("Added a key not lexically larger than" +
@ -595,10 +608,16 @@ public class HFile {
metaOffsets = new ArrayList<Long>(metaNames.size());
metaDataSizes = new ArrayList<Integer>(metaNames.size());
for (int i = 0 ; i < metaNames.size() ; ++ i ) {
metaOffsets.add(Long.valueOf(outputStream.getPos()));
metaDataSizes.
add(Integer.valueOf(METABLOCKMAGIC.length + metaData.get(i).length));
writeMetaBlock(metaData.get(i));
// store the beginning offset
long curPos = outputStream.getPos();
metaOffsets.add(curPos);
// write the metadata content
DataOutputStream dos = getCompressingStream();
dos.write(METABLOCKMAGIC);
metaData.get(i).write(dos);
int size = releaseCompressingStream(dos);
// store the metadata size
metaDataSizes.add(size);
}
}
@ -632,17 +651,6 @@ public class HFile {
}
}
/* Write a metadata block.
* @param metadata
* @throws IOException
*/
private void writeMetaBlock(final byte [] b) throws IOException {
DataOutputStream dos = getCompressingStream();
dos.write(METABLOCKMAGIC);
dos.write(b);
releaseCompressingStream(dos);
}
/*
* Add last bits of metadata to fileinfo and then write it out.
* Reader will be expecting to find all below.
@ -668,7 +676,7 @@ public class HFile {
appendFileInfo(this.fileinfo, FileInfo.AVG_VALUE_LEN,
Bytes.toBytes(avgValueLen), false);
appendFileInfo(this.fileinfo, FileInfo.COMPARATOR,
Bytes.toBytes(this.comparator.getClass().getName()), false);
Bytes.toBytes(this.rawComparator.getClass().getName()), false);
long pos = o.getPos();
this.fileinfo.write(o);
return pos;
@ -710,6 +718,7 @@ public class HFile {
private final BlockCache cache;
public int cacheHits = 0;
public int blockLoads = 0;
public int metaLoads = 0;
// Whether file is from in-memory store
private boolean inMemory = false;
@ -717,15 +726,7 @@ public class HFile {
// Name for this object used when logging or in toString. Is either
// the result of a toString on the stream or else is toString of passed
// file Path plus metadata key/value pairs.
private String name;
/*
* Do not expose the default constructor.
*/
@SuppressWarnings("unused")
private Reader() throws IOException {
this(null, -1, null, false);
}
protected String name;
/**
* Opens a HFile. You must load the file info before you can
@ -799,7 +800,8 @@ public class HFile {
* See {@link Writer#appendFileInfo(byte[], byte[])}.
* @throws IOException
*/
public Map<byte [], byte []> loadFileInfo() throws IOException {
public Map<byte [], byte []> loadFileInfo()
throws IOException {
this.trailer = readTrailer();
// Read in the fileinfo and get what we need from it.
@ -889,16 +891,19 @@ public class HFile {
}
/**
* @param metaBlockName
* @param cacheBlock Add block to cache, if found
* @return Block wrapped in a ByteBuffer
* @throws IOException
*/
public ByteBuffer getMetaBlock(String metaBlockName) throws IOException {
public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
throws IOException {
if (trailer.metaIndexCount == 0) {
return null; // there are no meta blocks
}
if (metaIndex == null) {
throw new IOException("Meta index not loaded");
}
byte [] mbname = Bytes.toBytes(metaBlockName);
int block = metaIndex.blockContainingKey(mbname, 0, mbname.length);
if (block == -1)
@ -910,19 +915,45 @@ public class HFile {
blockSize = metaIndex.blockOffsets[block+1] - metaIndex.blockOffsets[block];
}
ByteBuffer buf = decompress(metaIndex.blockOffsets[block],
longToInt(blockSize), metaIndex.blockDataSizes[block], true);
byte [] magic = new byte[METABLOCKMAGIC.length];
buf.get(magic, 0, magic.length);
long now = System.currentTimeMillis();
if (! Arrays.equals(magic, METABLOCKMAGIC)) {
throw new IOException("Meta magic is bad in block " + block);
// Per meta key from any given file, synchronize reads for said block
synchronized (metaIndex.blockKeys[block]) {
metaLoads++;
// Check cache for block. If found return.
if (cache != null) {
ByteBuffer cachedBuf = cache.getBlock(name + "meta" + block);
if (cachedBuf != null) {
// Return a distinct 'shallow copy' of the block,
// so pos doesnt get messed by the scanner
cacheHits++;
return cachedBuf.duplicate();
}
// Cache Miss, please load.
}
ByteBuffer buf = decompress(metaIndex.blockOffsets[block],
longToInt(blockSize), metaIndex.blockDataSizes[block], true);
byte [] magic = new byte[METABLOCKMAGIC.length];
buf.get(magic, 0, magic.length);
if (! Arrays.equals(magic, METABLOCKMAGIC)) {
throw new IOException("Meta magic is bad in block " + block);
}
// Create a new ByteBuffer 'shallow copy' to hide the magic header
buf = buf.slice();
readTime += System.currentTimeMillis() - now;
readOps++;
// Cache the block
if(cacheBlock && cache != null) {
cache.cacheBlock(name + "meta" + block, buf.duplicate(), inMemory);
}
return buf;
}
// Toss the header. May have to remove later due to performance.
buf.compact();
buf.limit(buf.limit() - METABLOCKMAGIC.length);
buf.rewind();
return buf;
}
/**
@ -952,8 +983,8 @@ public class HFile {
if (cache != null) {
ByteBuffer cachedBuf = cache.getBlock(name + block);
if (cachedBuf != null) {
// Return a distinct 'copy' of the block, so pos doesnt get messed by
// the scanner
// Return a distinct 'shallow copy' of the block,
// so pos doesnt get messed by the scanner
cacheHits++;
return cachedBuf.duplicate();
}
@ -982,11 +1013,12 @@ public class HFile {
if (!Arrays.equals(magic, DATABLOCKMAGIC)) {
throw new IOException("Data magic is bad in block " + block);
}
// Toss the header. May have to remove later due to performance.
buf.compact();
buf.limit(buf.limit() - DATABLOCKMAGIC.length);
buf.rewind();
// 'shallow copy' to hide the header
// NOTE: you WILL GET BIT if you call buf.array() but don't start
// reading at buf.arrayOffset()
buf = buf.slice();
readTime += System.currentTimeMillis() - now;
readOps++;
@ -1045,6 +1077,9 @@ public class HFile {
return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0];
}
/**
* @return number of KV entries in this HFile
*/
public int getEntries() {
if (!this.isFileInfoLoaded()) {
throw new RuntimeException("File info not loaded");
@ -1061,6 +1096,13 @@ public class HFile {
}
return this.blockIndex.isEmpty()? null: this.lastkey;
}
/**
* @return number of K entries in this HFile's filter. Returns KV count if no filter.
*/
public int getFilterEntries() {
return getEntries();
}
/**
* @return Comparator.
@ -1099,7 +1141,7 @@ public class HFile {
/*
* Implementation of {@link HFileScanner} interface.
*/
private static class Scanner implements HFileScanner {
protected static class Scanner implements HFileScanner {
private final Reader reader;
private ByteBuffer block;
private int currBlock;
@ -1180,6 +1222,11 @@ public class HFile {
return true;
}
public boolean shouldSeek(final byte[] row,
final SortedSet<byte[]> columns) {
return true;
}
public int seekTo(byte [] key) throws IOException {
return seekTo(key, 0, key.length);
}
@ -1333,10 +1380,10 @@ public class HFile {
* parts of the file. Also includes basic metadata on this file.
*/
private static class FixedFileTrailer {
// Offset to the data block index.
long dataIndexOffset;
// Offset to the fileinfo data, a small block of vitals..
long fileinfoOffset;
// Offset to the data block index.
long dataIndexOffset;
// How many index counts are there (aka: block count)
int dataIndexCount;
// Offset to the meta block index.

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.SortedSet;
import org.apache.hadoop.hbase.KeyValue;
@ -64,6 +65,17 @@ public interface HFileScanner {
*/
public boolean seekBefore(byte [] key) throws IOException;
public boolean seekBefore(byte []key, int offset, int length) throws IOException;
/**
* Optimization for single key lookups. If the file has a filter,
* perform a lookup on the key.
* @param row the row to scan
* @param family the column family to scan
* @param columns the array of column qualifiers to scan
* @return False if the key definitely does not exist in this ScanFile
* @throws IOException
*/
public boolean shouldSeek(final byte[] row,
final SortedSet<byte[]> columns);
/**
* Positions this scanner at the start of the file.
* @return False if empty file; i.e. a call to next would return false and

View File

@ -112,7 +112,10 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
private void close(final HFile.Writer w) throws IOException {
if (w != null) {
StoreFile.appendMetadata(w, System.currentTimeMillis(), true);
w.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY,
Bytes.toBytes(System.currentTimeMillis()));
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
Bytes.toBytes(true));
w.close();
}
}

View File

@ -51,7 +51,7 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
* @param scanners
* @param comparator
*/
public KeyValueHeap(List<KeyValueScanner> scanners, KVComparator comparator) {
public KeyValueHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator) {
this.comparator = new KVScannerComparator(comparator);
this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
this.comparator);

View File

@ -33,23 +33,20 @@ import java.util.List;
* and optionally the memstore-snapshot.
*/
public class MinorCompactingStoreScanner implements KeyValueScanner, InternalScanner {
private KeyValueHeap heap;
private KeyValue.KVComparator comparator;
MinorCompactingStoreScanner(Store store,
List<KeyValueScanner> scanners) {
MinorCompactingStoreScanner(Store store, List<? extends KeyValueScanner> scanners) {
comparator = store.comparator;
KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW);
for (KeyValueScanner scanner : scanners ) {
scanner.seek(firstKv);
}
heap = new KeyValueHeap(scanners, store.comparator);
}
MinorCompactingStoreScanner(String cfName, KeyValue.KVComparator comparator,
List<KeyValueScanner> scanners) {
List<? extends KeyValueScanner> scanners) {
this.comparator = comparator;
KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW);

View File

@ -101,7 +101,7 @@ public class Store implements HConstants, HeapSize {
private final HRegion region;
private final HColumnDescriptor family;
final FileSystem fs;
private final Configuration conf;
final Configuration conf;
// ttl in milliseconds.
protected long ttl;
private long majorCompactionTime;
@ -144,7 +144,6 @@ public class Store implements HConstants, HeapSize {
// Comparing KeyValues
final KeyValue.KVComparator comparator;
final KeyValue.KVComparator comparatorIgnoringType;
/**
* Constructor
@ -179,7 +178,6 @@ public class Store implements HConstants, HeapSize {
this.blocksize = family.getBlocksize();
this.compression = family.getCompression();
this.comparator = info.getComparator();
this.comparatorIgnoringType = this.comparator.getComparatorIgnoringType();
// getTimeToLive returns ttl in seconds. Convert to milliseconds.
this.ttl = family.getTimeToLive();
if (ttl == HConstants.FOREVER) {
@ -415,7 +413,9 @@ public class Store implements HConstants, HeapSize {
}
StoreFile curfile = null;
try {
curfile = new StoreFile(fs, p, blockcache, this.conf, this.inMemory);
curfile = new StoreFile(fs, p, blockcache, this.conf,
this.family.getBloomFilterType(), this.inMemory);
curfile.createReader();
} catch (IOException ioe) {
LOG.warn("Failed open of " + p + "; presumption is that file was " +
"corrupted at flush and lost edits picked up by commit log replay. " +
@ -492,7 +492,7 @@ public class Store implements HConstants, HeapSize {
// Clear so metrics doesn't find them.
this.storefiles.clear();
for (StoreFile f: result) {
f.close();
f.closeReader();
}
LOG.debug("closed " + this.storeNameStr);
return result;
@ -534,7 +534,7 @@ public class Store implements HConstants, HeapSize {
private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
final long logCacheFlushId)
throws IOException {
HFile.Writer writer = null;
StoreFile.Writer writer = null;
long flushed = 0;
// Don't flush if there are no entries.
if (set.size() == 0) {
@ -546,7 +546,7 @@ public class Store implements HConstants, HeapSize {
// if we fail.
synchronized (flushLock) {
// A. Write the map out to the disk
writer = getWriter();
writer = createWriter(this.homedir, set.size());
int entries = 0;
try {
for (KeyValue kv: set) {
@ -559,13 +559,13 @@ public class Store implements HConstants, HeapSize {
} finally {
// Write out the log sequence number that corresponds to this output
// hfile. The hfile is current up to and including logCacheFlushId.
StoreFile.appendMetadata(writer, logCacheFlushId);
writer.appendMetadata(logCacheFlushId, false);
writer.close();
}
}
StoreFile sf = new StoreFile(this.fs, writer.getPath(), blockcache,
this.conf, this.inMemory);
Reader r = sf.getReader();
this.conf, this.family.getBloomFilterType(), this.inMemory);
Reader r = sf.createReader();
this.storeSize += r.length();
if(LOG.isDebugEnabled()) {
LOG.debug("Added " + sf + ", entries=" + r.getEntries() +
@ -577,22 +577,16 @@ public class Store implements HConstants, HeapSize {
return sf;
}
/**
* @return Writer for this store.
* @throws IOException
*/
HFile.Writer getWriter() throws IOException {
return getWriter(this.homedir);
}
/*
* @return Writer for this store.
* @param basedir Directory to put writer in.
* @throws IOException
*/
private HFile.Writer getWriter(final Path basedir) throws IOException {
return StoreFile.getWriter(this.fs, basedir, this.blocksize,
this.compression, this.comparator.getRawComparator());
private StoreFile.Writer createWriter(final Path basedir, int maxKeyCount)
throws IOException {
return StoreFile.createWriter(this.fs, basedir, this.blocksize,
this.compression, this.comparator, this.conf,
this.family.getBloomFilterType(), maxKeyCount);
}
/*
@ -880,13 +874,25 @@ public class Store implements HConstants, HeapSize {
private HFile.Writer compact(final List<StoreFile> filesToCompact,
final boolean majorCompaction, final long maxId)
throws IOException {
// calculate maximum key count after compaction (for blooms)
int maxKeyCount = 0;
for (StoreFile file : filesToCompact) {
StoreFile.Reader r = file.getReader();
if (r != null) {
// NOTE: getFilterEntries could cause under-sized blooms if the user
// switches bloom type (e.g. from ROW to ROWCOL)
maxKeyCount += (r.getBloomFilterType() == family.getBloomFilterType())
? r.getFilterEntries() : r.getEntries();
}
}
// For each file, obtain a scanner:
List<KeyValueScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
filesToCompact, false, false);
List<StoreFileScanner> scanners = StoreFileScanner
.getScannersForStoreFiles(filesToCompact, false, false);
// Make the instantiation lazy in case compaction produces no product; i.e.
// where all source cells are expired or deleted.
HFile.Writer writer = null;
StoreFile.Writer writer = null;
try {
if (majorCompaction) {
InternalScanner scanner = null;
@ -901,7 +907,7 @@ public class Store implements HConstants, HeapSize {
// output to writer:
for (KeyValue kv : kvs) {
if (writer == null) {
writer = getWriter(this.regionCompactionDir);
writer = createWriter(this.regionCompactionDir, maxKeyCount);
}
writer.append(kv);
}
@ -916,7 +922,7 @@ public class Store implements HConstants, HeapSize {
MinorCompactingStoreScanner scanner = null;
try {
scanner = new MinorCompactingStoreScanner(this, scanners);
writer = getWriter(this.regionCompactionDir);
writer = createWriter(this.regionCompactionDir, maxKeyCount);
while (scanner.next(writer)) {
// Nothing to do
}
@ -927,7 +933,7 @@ public class Store implements HConstants, HeapSize {
}
} finally {
if (writer != null) {
StoreFile.appendMetadata(writer, maxId, majorCompaction);
writer.appendMetadata(maxId, majorCompaction);
writer.close();
}
}
@ -971,7 +977,9 @@ public class Store implements HConstants, HeapSize {
LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
return null;
}
result = new StoreFile(this.fs, p, blockcache, this.conf, this.inMemory);
result = new StoreFile(this.fs, p, blockcache, this.conf,
this.family.getBloomFilterType(), this.inMemory);
result.createReader();
}
this.lock.writeLock().lock();
try {
@ -1001,7 +1009,7 @@ public class Store implements HConstants, HeapSize {
notifyChangedReadersObservers();
// Finally, delete old store files.
for (StoreFile hsf: compactedFiles) {
hsf.delete();
hsf.deleteReader();
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
@ -1570,7 +1578,7 @@ public class Store implements HConstants, HeapSize {
}
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (17 * ClassSize.REFERENCE) +
ClassSize.OBJECT + (16 * ClassSize.REFERENCE) +
(6 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN +
ClassSize.align(ClassSize.ARRAY));

View File

@ -21,26 +21,37 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.HalfHFileReader;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.ByteBloomFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Hash;
import org.apache.hadoop.util.StringUtils;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Map;
import java.util.SortedSet;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
@ -49,11 +60,11 @@ import java.util.regex.Pattern;
/**
* A Store data file. Stores usually have one or more of these files. They
* are produced by flushing the memstore to disk. To
* create, call {@link #getWriter(FileSystem, Path)} and append data. Be
* create, call {@link #createWriter(FileSystem, Path, int)} and append data. Be
* sure to add any metadata before calling close on the Writer
* (Use the appendMetadata convenience methods). On close, a StoreFile is
* sitting in the Filesystem. To refer to it, create a StoreFile instance
* passing filesystem and path. To read, call {@link #getReader()}.
* passing filesystem and path. To read, call {@link #createReader()}.
* <p>StoreFiles may also reference store files in another Store.
*/
public class StoreFile implements HConstants {
@ -65,7 +76,7 @@ public class StoreFile implements HConstants {
// Make default block size for StoreFiles 8k while testing. TODO: FIX!
// Need to make it 8k for testing.
private static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
private final FileSystem fs;
// This file's path.
@ -80,16 +91,23 @@ public class StoreFile implements HConstants {
private boolean inMemory;
// Keys for metadata stored in backing HFile.
private static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
/** Constant for the max sequence ID meta */
public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
// Set when we obtain a Reader.
private long sequenceid = -1;
private static final byte [] MAJOR_COMPACTION_KEY =
/** Constant for major compaction meta */
public static final byte [] MAJOR_COMPACTION_KEY =
Bytes.toBytes("MAJOR_COMPACTION_KEY");
// If true, this file was product of a major compaction. Its then set
// whenever you get a Reader.
private AtomicBoolean majorCompaction = null;
static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
static final byte[] BLOOM_FILTER_TYPE_KEY =
Bytes.toBytes("BLOOM_FILTER_TYPE");
/*
* Regex that will work for straight filenames and for reference names.
* If reference, then the regex has more than just one group. Group 1 is
@ -98,11 +116,12 @@ public class StoreFile implements HConstants {
private static final Pattern REF_NAME_PARSER =
Pattern.compile("^(\\d+)(?:\\.(.+))?$");
private volatile HFile.Reader reader;
private volatile StoreFile.Reader reader;
// Used making file ids.
private final static Random rand = new Random();
private final Configuration conf;
private final BloomType bloomType;
/**
* Constructor, loads a reader and it's indices, etc. May allocate a
@ -112,10 +131,11 @@ public class StoreFile implements HConstants {
* @param p The path of the file.
* @param blockcache <code>true</code> if the block cache is enabled.
* @param conf The current configuration.
* @param bt The bloom type to use for this store file
* @throws IOException When opening the reader fails.
*/
StoreFile(final FileSystem fs, final Path p, final boolean blockcache,
final Configuration conf, final boolean inMemory)
final Configuration conf, final BloomType bt, final boolean inMemory)
throws IOException {
this.conf = conf;
this.fs = fs;
@ -126,7 +146,14 @@ public class StoreFile implements HConstants {
this.reference = Reference.read(fs, p);
this.referencePath = getReferredToFile(this.path);
}
this.reader = open();
// ignore if the column family config says "no bloom filter"
// even if there is one in the hfile.
if (conf.getBoolean("io.hfile.bloom.enabled", true)) {
this.bloomType = bt;
} else {
this.bloomType = BloomType.NONE;
LOG.info("Ignoring bloom filter check for file (disabled in config)");
}
}
/**
@ -255,18 +282,18 @@ public class StoreFile implements HConstants {
* Opens reader on this store file. Called by Constructor.
* @return Reader for the store file.
* @throws IOException
* @see #close()
* @see #closeReader()
*/
protected HFile.Reader open()
private StoreFile.Reader open()
throws IOException {
if (this.reader != null) {
throw new IllegalAccessError("Already open");
}
if (isReference()) {
this.reader = new HalfHFileReader(this.fs, this.referencePath,
this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
getBlockCache(), this.reference);
} else {
this.reader = new Reader(this.fs, this.path, getBlockCache(),
this.reader = new StoreFile.Reader(this.fs, this.path, getBlockCache(),
this.inMemory);
}
// Load up indices and fileinfo.
@ -296,44 +323,59 @@ public class StoreFile implements HConstants {
this.majorCompaction.set(mc);
}
}
if (this.bloomType != BloomType.NONE) {
this.reader.loadBloomfilter();
}
// TODO read in bloom filter here, ignore if the column family config says
// "no bloom filter" even if there is one in the hfile.
return this.reader;
}
/**
* @return Reader for StoreFile. creates if necessary
* @throws IOException
*/
public StoreFile.Reader createReader() throws IOException {
if (this.reader == null) {
this.reader = open();
}
return this.reader;
}
/**
* @return Current reader. Must call open first else returns null.
* @return Current reader. Must call createReader first else returns null.
* @throws IOException
* @see {@link #createReader()}
*/
public HFile.Reader getReader() {
public StoreFile.Reader getReader() {
return this.reader;
}
/**
* @throws IOException
*/
public synchronized void close() throws IOException {
public synchronized void closeReader() throws IOException {
if (this.reader != null) {
this.reader.close();
this.reader = null;
}
}
@Override
public String toString() {
return this.path.toString() +
(isReference()? "-" + this.referencePath + "-" + reference.toString(): "");
}
/**
* Delete this file
* @throws IOException
*/
public void delete() throws IOException {
close();
public void deleteReader() throws IOException {
closeReader();
this.fs.delete(getPath(), true);
}
@Override
public String toString() {
return this.path.toString() +
(isReference()? "-" + this.referencePath + "-" + reference.toString(): "");
}
/**
* Utility to help with rename.
* @param fs
@ -361,38 +403,47 @@ public class StoreFile implements HConstants {
* @param fs
* @param dir Path to family directory. Makes the directory if doesn't exist.
* Creates a file with a unique name in this directory.
* @param blocksize size per filesystem block
* @return HFile.Writer
* @throws IOException
*/
public static HFile.Writer getWriter(final FileSystem fs, final Path dir)
public static StoreFile.Writer createWriter(final FileSystem fs, final Path dir,
final int blocksize)
throws IOException {
return getWriter(fs, dir, DEFAULT_BLOCKSIZE_SMALL, null, null);
return createWriter(fs,dir,blocksize,null,null,null,BloomType.NONE,0);
}
/**
* Get a store file writer. Client is responsible for closing file when done.
* If metadata, add BEFORE closing using
* {@link #appendMetadata(org.apache.hadoop.hbase.io.hfile.HFile.Writer, long)}.
* Create a store file writer. Client is responsible for closing file when done.
* If metadata, add BEFORE closing using appendMetadata()
* @param fs
* @param dir Path to family directory. Makes the directory if doesn't exist.
* Creates a file with a unique name in this directory.
* @param blocksize
* @param algorithm Pass null to get default.
* @param conf HBase system configuration. used with bloom filters
* @param bloomType column family setting for bloom filters
* @param c Pass null to get default.
* @param maxKeySize peak theoretical entry size (maintains error rate)
* @return HFile.Writer
* @throws IOException
*/
public static HFile.Writer getWriter(final FileSystem fs, final Path dir,
final int blocksize, final Compression.Algorithm algorithm,
final KeyValue.KeyComparator c)
public static StoreFile.Writer createWriter(final FileSystem fs, final Path dir,
final int blocksize, final Compression.Algorithm algorithm,
final KeyValue.KVComparator c, final Configuration conf,
BloomType bloomType, int maxKeySize)
throws IOException {
if (!fs.exists(dir)) {
fs.mkdirs(dir);
}
Path path = getUniqueFile(fs, dir);
return new HFile.Writer(fs, path, blocksize,
algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
c == null? KeyValue.KEY_COMPARATOR: c);
if(conf == null || !conf.getBoolean("io.hfile.bloom.enabled", true)) {
bloomType = BloomType.NONE;
}
return new StoreFile.Writer(fs, path, blocksize,
algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize);
}
/**
@ -442,35 +493,6 @@ public class StoreFile implements HConstants {
return p;
}
/**
* Write file metadata.
* Call before you call close on the passed <code>w</code> since its written
* as metadata to that file.
*
* @param w hfile writer
* @param maxSequenceId Maximum sequence id.
* @throws IOException
*/
static void appendMetadata(final HFile.Writer w, final long maxSequenceId)
throws IOException {
appendMetadata(w, maxSequenceId, false);
}
/**
* Writes metadata.
* Call before you call close on the passed <code>w</code> since its written
* as metadata to that file.
* @param maxSequenceId Maximum sequence id.
* @param mc True if this file is product of a major compaction
* @throws IOException
*/
public static void appendMetadata(final HFile.Writer w, final long maxSequenceId,
final boolean mc)
throws IOException {
w.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(mc));
}
/*
* Write out a split reference.
* @param fs
@ -497,4 +519,298 @@ public class StoreFile implements HConstants {
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
return r.write(fs, p);
}
public static enum BloomType {
/**
* Bloomfilters disabled
*/
NONE,
/**
* Bloom enabled with Table row as Key
*/
ROW,
/**
* Bloom enabled with Table row & column (family+qualifier) as Key
*/
ROWCOL
}
/**
*
*/
public static class Reader extends HFile.Reader {
/** Bloom Filter class. Caches only meta, pass in data */
protected BloomFilter bloomFilter = null;
/** Type of bloom filter (e.g. ROW vs ROWCOL) */
protected BloomType bloomFilterType;
public Reader(FileSystem fs, Path path, BlockCache cache,
boolean inMemory)
throws IOException {
super(fs, path, cache, inMemory);
}
public Reader(final FSDataInputStream fsdis, final long size,
final BlockCache cache, final boolean inMemory) {
super(fsdis,size,cache,inMemory);
bloomFilterType = BloomType.NONE;
}
@Override
public Map<byte [], byte []> loadFileInfo()
throws IOException {
Map<byte [], byte []> fi = super.loadFileInfo();
byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
if (b != null) {
bloomFilterType = BloomType.valueOf(Bytes.toString(b));
}
return fi;
}
/**
* Load the bloom filter for this HFile into memory.
* Assumes the HFile has already been loaded
*/
public void loadBloomfilter() {
if (this.bloomFilter != null) {
return; // already loaded
}
// see if bloom filter information is in the metadata
try {
ByteBuffer b = getMetaBlock(BLOOM_FILTER_META_KEY, false);
if (b != null) {
if (bloomFilterType == BloomType.NONE) {
throw new IOException("valid bloom filter type not found in FileInfo");
}
this.bloomFilter = new ByteBloomFilter(b);
LOG.info("Loaded " + (bloomFilterType==BloomType.ROW? "row":"col")
+ " bloom filter metadata for " + name);
}
} catch (IOException e) {
LOG.error("Error reading bloom filter meta -- proceeding without", e);
this.bloomFilter = null;
} catch (IllegalArgumentException e) {
LOG.error("Bad bloom filter meta -- proceeding without", e);
this.bloomFilter = null;
}
}
BloomFilter getBloomFilter() {
return this.bloomFilter;
}
/**
* @return bloom type information associated with this store file
*/
public BloomType getBloomFilterType() {
return this.bloomFilterType;
}
@Override
public int getFilterEntries() {
return (this.bloomFilter != null) ? this.bloomFilter.getKeyCount()
: super.getFilterEntries();
}
@Override
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
return new Scanner(this, cacheBlocks, pread);
}
protected class Scanner extends HFile.Reader.Scanner {
public Scanner(Reader r, boolean cacheBlocks, final boolean pread) {
super(r, cacheBlocks, pread);
}
@Override
public boolean shouldSeek(final byte[] row,
final SortedSet<byte[]> columns) {
if (bloomFilter == null) {
return true;
}
byte[] key;
switch(bloomFilterType) {
case ROW:
key = row;
break;
case ROWCOL:
if (columns.size() == 1) {
byte[] col = columns.first();
key = Bytes.add(row, col);
break;
}
//$FALL-THROUGH$
default:
return true;
}
try {
ByteBuffer bloom = getMetaBlock(BLOOM_FILTER_DATA_KEY, true);
if (bloom != null) {
return bloomFilter.contains(key, bloom);
}
} catch (IOException e) {
LOG.error("Error reading bloom filter data -- proceeding without",
e);
bloomFilter = null;
} catch (IllegalArgumentException e) {
LOG.error("Bad bloom filter data -- proceeding without", e);
bloomFilter = null;
}
return true;
}
}
}
/**
*
*/
public static class Writer extends HFile.Writer {
private final BloomFilter bloomFilter;
private final BloomType bloomType;
private KVComparator kvComparator;
private KeyValue lastKv = null;
private byte[] lastByteArray = null;
/**
* Creates an HFile.Writer that also write helpful meta data.
* @param fs file system to write to
* @param path file name to create
* @param blocksize HDFS block size
* @param compress HDFS block compression
* @param conf user configuration
* @param comparator key comparator
* @param bloomType bloom filter setting
* @param maxKeys maximum amount of keys to add (for blooms)
* @throws IOException problem writing to FS
*/
public Writer(FileSystem fs, Path path, int blocksize,
Compression.Algorithm compress, final Configuration conf,
final KVComparator comparator, BloomType bloomType, int maxKeys)
throws IOException {
super(fs, path, blocksize, compress, comparator.getRawComparator());
this.kvComparator = comparator;
if (bloomType != BloomType.NONE && conf != null) {
float err = conf.getFloat("io.hfile.bloom.error.rate", (float)0.01);
int maxFold = conf.getInt("io.hfile.bloom.max.fold", 7);
this.bloomFilter = new ByteBloomFilter(maxKeys, err,
Hash.getHashType(conf), maxFold);
this.bloomFilter.allocBloom();
this.bloomType = bloomType;
} else {
this.bloomFilter = null;
this.bloomType = BloomType.NONE;
}
}
/**
* Writes meta data.
* Call before {@link #close()} since its written as meta data to this file.
* @param maxSequenceId Maximum sequence id.
* @param majorCompaction True if this file is product of a major compaction
* @throws IOException problem writing to FS
*/
public void appendMetadata(final long maxSequenceId,
final boolean majorCompaction)
throws IOException {
appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
}
@Override
public void append(final KeyValue kv)
throws IOException {
if (this.bloomFilter != null) {
// only add to the bloom filter on a new, unique key
boolean newKey = true;
if (this.lastKv != null) {
switch(bloomType) {
case ROW:
newKey = ! kvComparator.matchingRows(kv, lastKv);
break;
case ROWCOL:
newKey = ! kvComparator.matchingRowColumn(kv, lastKv);
break;
case NONE:
newKey = false;
}
}
if (newKey) {
/*
* http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
* Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
*
* 2 Types of Filtering:
* 1. Row = Row
* 2. RowCol = Row + Qualifier
*/
switch (bloomType) {
case ROW:
this.bloomFilter.add(kv.getBuffer(), kv.getRowOffset(),
kv.getRowLength());
break;
case ROWCOL:
// merge(row, qualifier)
int ro = kv.getRowOffset();
int rl = kv.getRowLength();
int qo = kv.getQualifierOffset();
int ql = kv.getQualifierLength();
byte [] result = new byte[rl + ql];
System.arraycopy(kv.getBuffer(), ro, result, 0, rl);
System.arraycopy(kv.getBuffer(), qo, result, rl, ql);
this.bloomFilter.add(result);
break;
default:
}
this.lastKv = kv;
}
}
super.append(kv);
}
@Override
public void append(final byte [] key, final byte [] value)
throws IOException {
if (this.bloomFilter != null) {
// only add to the bloom filter on a new row
if(this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) {
this.bloomFilter.add(key);
this.lastByteArray = key;
}
}
super.append(key, value);
}
@Override
public void close()
throws IOException {
// make sure we wrote something to the bloom before adding it
if (this.bloomFilter != null && this.bloomFilter.getKeyCount() > 0) {
bloomFilter.finalize();
if (this.bloomFilter.getMaxKeys() > 0) {
int b = this.bloomFilter.getByteSize();
int k = this.bloomFilter.getKeyCount();
int m = this.bloomFilter.getMaxKeys();
StoreFile.LOG.info("Bloom added to HFile. " + b + "B, " +
k + "/" + m + " (" + NumberFormat.getPercentInstance().format(
((double)k) / ((double)m)) + ")");
}
appendMetaBlock(BLOOM_FILTER_META_KEY, bloomFilter.getMetaWriter());
appendMetaBlock(BLOOM_FILTER_DATA_KEY, bloomFilter.getDataWriter());
appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
}
super.close();
}
}
}

View File

@ -53,12 +53,12 @@ class StoreFileScanner implements KeyValueScanner {
* Return an array of scanners corresponding to the given
* set of store files.
*/
public static List<KeyValueScanner> getScannersForStoreFiles(
public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> filesToCompact,
boolean cacheBlocks,
boolean usePread) {
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(filesToCompact.size());
List<StoreFileScanner> scanners =
new ArrayList<StoreFileScanner>(filesToCompact.size());
for (StoreFile file : filesToCompact) {
Reader r = file.getReader();
if (r == null) {
@ -72,6 +72,10 @@ class StoreFileScanner implements KeyValueScanner {
return scanners;
}
public HFileScanner getHFileScanner() {
return this.hfs;
}
public String toString() {
return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
}

View File

@ -61,10 +61,10 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
store.versionsToReturn(scan.getMaxVersions()));
this.isGet = scan.isGetScan();
List<KeyValueScanner> scanners = getScanners();
// pass columns = try to filter out unnecessary ScanFiles
List<KeyValueScanner> scanners = getScanners(scan, columns);
// Seek all scanners to the initial key
// TODO if scan.isGetScan, use bloomfilters to skip seeking
for(KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
@ -83,7 +83,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
* @param scan the spec
* @param scanners ancilliary scanners
*/
StoreScanner(Store store, Scan scan, List<KeyValueScanner> scanners) {
StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners) {
this.store = store;
this.cacheBlocks = false;
this.isGet = false;
@ -124,9 +124,37 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
private List<KeyValueScanner> getScanners() {
// First the store file scanners
Map<Long, StoreFile> map = this.store.getStorefiles().descendingMap();
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(map.values(), cacheBlocks, isGet);
List<KeyValueScanner> scanners =
StoreFileScanner.getScannersForStoreFiles(map.values(),
cacheBlocks, isGet);
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);
// Then the memstore scanners
scanners.addAll(this.store.memstore.getScanners());
return scanners;
}
/*
* @return List of scanners to seek, possibly filtered by StoreFile.
*/
private List<KeyValueScanner> getScanners(Scan scan,
final NavigableSet<byte[]> columns) {
// First the store file scanners
Map<Long, StoreFile> map = this.store.getStorefiles().descendingMap();
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(map.values(), cacheBlocks, isGet);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
// exclude scan files that have failed file filters
for(StoreFileScanner sfs : sfScanners) {
if (isGet &&
!sfs.getHFileScanner().shouldSeek(scan.getStartRow(), columns)) {
continue; // exclude this hfs
}
scanners.add(sfs);
}
// Then the memstore scanners
scanners.addAll(this.store.memstore.getScanners());
return scanners;

View File

@ -146,16 +146,15 @@ public class ColumnSchemaModel implements Serializable {
}
/**
* @return true if the BLOOMFILTER attribute is present and true
* @return the value of the BLOOMFILTER attribute or its default if unset
*/
public boolean __getBloomfilter() {
public String __getBloomfilter() {
Object o = attrs.get(BLOOMFILTER);
return o != null ?
Boolean.valueOf(o.toString()) : HColumnDescriptor.DEFAULT_BLOOMFILTER;
return o != null ? o.toString() : HColumnDescriptor.DEFAULT_BLOOMFILTER;
}
/**
* @return the value of the COMPRESSION attribute or its default if it is unset
* @return the value of the COMPRESSION attribute or its default if unset
*/
public String __getCompression() {
Object o = attrs.get(COMPRESSION);
@ -203,8 +202,8 @@ public class ColumnSchemaModel implements Serializable {
attrs.put(BLOCKCACHE, Boolean.toString(value));
}
public void __setBloomfilter(boolean value) {
attrs.put(BLOOMFILTER, Boolean.toString(value));
public void __setBloomfilter(String value) {
attrs.put(BLOOMFILTER, value);
}
/**

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.thrift.generated.TCell;
@ -47,10 +49,8 @@ public class ThriftUtilities {
throws IllegalArgument {
Compression.Algorithm comp =
Compression.getCompressionAlgorithmByName(in.compression.toLowerCase());
boolean bloom = false;
if (in.bloomFilterType.compareTo("NONE") != 0) {
bloom = true;
}
StoreFile.BloomType bt =
BloomType.valueOf(in.bloomFilterType);
if (in.name == null || in.name.length <= 0) {
throw new IllegalArgument("column name is empty");
@ -58,7 +58,7 @@ public class ThriftUtilities {
byte [] parsedName = KeyValue.parseColumn(in.name)[0];
HColumnDescriptor col = new HColumnDescriptor(parsedName,
in.maxVersions, comp.getName(), in.inMemory, in.blockCacheEnabled,
in.timeToLive, bloom);
in.timeToLive, bt.toString());
return col;
}
@ -77,7 +77,7 @@ public class ThriftUtilities {
col.compression = in.getCompression().toString();
col.inMemory = in.isInMemory();
col.blockCacheEnabled = in.isBlockCacheEnabled();
col.bloomFilterType = Boolean.toString(in.isBloomfilter());
col.bloomFilterType = in.getBloomFilterType().toString();
return col;
}
@ -147,4 +147,4 @@ public class ThriftUtilities {
Result [] result = { in };
return rowResultFromHBase(result);
}
}
}

View File

@ -106,16 +106,29 @@ public abstract class Hash {
* @return hash value
*/
public int hash(byte[] bytes, int initval) {
return hash(bytes, bytes.length, initval);
return hash(bytes, 0, bytes.length, initval);
}
/**
* Calculate a hash using bytes from 0 to <code>length</code>, and
* the provided seed value
* @param bytes input bytes
* @param length length of the valid bytes to consider
* @param length length of the valid bytes after offset to consider
* @param initval seed value
* @return hash value
*/
public abstract int hash(byte[] bytes, int length, int initval);
public int hash(byte[] bytes, int length, int initval) {
return hash(bytes, 0, length, initval);
}
/**
* Calculate a hash using bytes from 0 to <code>length</code>, and
* the provided seed value
* @param bytes input bytes
* @param offset the offset into the array to start consideration
* @param length length of the valid bytes after offset to consider
* @param initval seed value
* @return hash value
*/
public abstract int hash(byte[] bytes, int offset, int length, int initval);
}

View File

@ -80,11 +80,11 @@ public class JenkinsHash extends Hash {
*/
@Override
@SuppressWarnings("fallthrough")
public int hash(byte[] key, int nbytes, int initval) {
public int hash(byte[] key, int off, int nbytes, int initval) {
int length = nbytes;
long a, b, c; // We use longs because we don't have unsigned ints
a = b = c = (0x00000000deadbeefL + length + initval) & INT_MASK;
int offset = 0;
int offset = off;
for (; length > 12; offset += 12, length -= 12) {
//noinspection PointlessArithmeticExpression
a = (a + (key[offset + 0] & BYTE_MASK)) & INT_MASK;

View File

@ -35,7 +35,7 @@ public class MurmurHash extends Hash {
}
@Override
public int hash(byte[] data, int length, int seed) {
public int hash(byte[] data, int offset, int length, int seed) {
int m = 0x5bd1e995;
int r = 24;
@ -44,7 +44,7 @@ public class MurmurHash extends Hash {
int len_4 = length >> 2;
for (int i = 0; i < len_4; i++) {
int i_4 = i << 2;
int i_4 = (i << 2) + offset;
int k = data[i_4 + 3];
k = k << 8;
k = k | (data[i_4 + 2] & 0xff);
@ -63,16 +63,17 @@ public class MurmurHash extends Hash {
// avoid calculating modulo
int len_m = len_4 << 2;
int left = length - len_m;
int i_m = len_m + offset;
if (left != 0) {
if (left >= 3) {
h ^= data[len_m + 2] << 16;
h ^= data[i_m + 2] << 16;
}
if (left >= 2) {
h ^= data[len_m + 1] << 8;
h ^= data[i_m + 1] << 8;
}
if (left >= 1) {
h ^= data[len_m];
h ^= data[i_m];
}
h *= m;

View File

@ -334,7 +334,7 @@ module Hbase
arg[HColumnDescriptor::BLOCKCACHE]? JBoolean.valueOf(arg[HColumnDescriptor::BLOCKCACHE]): HColumnDescriptor::DEFAULT_BLOCKCACHE,
arg[HColumnDescriptor::BLOCKSIZE]? JInteger.valueOf(arg[HColumnDescriptor::BLOCKSIZE]): HColumnDescriptor::DEFAULT_BLOCKSIZE,
arg[HColumnDescriptor::TTL]? JInteger.new(arg[HColumnDescriptor::TTL]): HColumnDescriptor::DEFAULT_TTL,
arg[HColumnDescriptor::BLOOMFILTER]? JBoolean.valueOf(arg[HColumnDescriptor::BLOOMFILTER]): HColumnDescriptor::DEFAULT_BLOOMFILTER,
arg[HColumnDescriptor::BLOOMFILTER]? arg[HColumnDescriptor::BLOOMFILTER]: HColumnDescriptor::DEFAULT_BLOOMFILTER)
arg[HColumnDescriptor::REPLICATION_SCOPE]? JInteger.new(arg[REPLICATION_SCOPE]): HColumnDescriptor::DEFAULT_REPLICATION_SCOPE)
end

View File

@ -194,14 +194,19 @@ public abstract class HBaseTestCase extends TestCase {
HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(new HColumnDescriptor(fam1, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
Integer.MAX_VALUE, HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL));
Integer.MAX_VALUE, HConstants.FOREVER,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam2, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
Integer.MAX_VALUE, HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL));
Integer.MAX_VALUE, HConstants.FOREVER,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam3, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
Integer.MAX_VALUE, HConstants.FOREVER,
false, HConstants.REPLICATION_SCOPE_LOCAL));
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL));
return htd;
}

View File

@ -333,7 +333,8 @@ public class HBaseTestingUtility {
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE,
Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL,
false, HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
desc.addFamily(hcd);
}
(new HBaseAdmin(getConfiguration())).createTable(desc);
@ -359,7 +360,8 @@ public class HBaseTestingUtility {
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE,
Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL,
false, HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
desc.addFamily(hcd);
i++;
}

View File

@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -29,12 +31,14 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.io.hfile.HFile.BlockIndex;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
/**
* test hfile features.
@ -170,7 +174,18 @@ public class TestHFile extends HBaseTestCase {
private void writeNumMetablocks(Writer writer, int n) {
for (int i = 0; i < n; i++) {
writer.appendMetaBlock("HFileMeta" + i, ("something to test" + i).getBytes());
writer.appendMetaBlock("HFileMeta" + i, new Writable() {
private int val;
public Writable setVal(int val) { this.val = val; return this; }
@Override
public void write(DataOutput out) throws IOException {
out.write(("something to test" + val).getBytes());
}
@Override
public void readFields(DataInput in) throws IOException { }
}.setVal(i));
}
}
@ -180,10 +195,10 @@ public class TestHFile extends HBaseTestCase {
private void readNumMetablocks(Reader reader, int n) throws IOException {
for (int i = 0; i < n; i++) {
ByteBuffer b = reader.getMetaBlock("HFileMeta" + i);
byte [] found = Bytes.toBytes(b);
assertTrue("failed to match metadata", Arrays.equals(
("something to test" + i).getBytes(), found));
ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false);
ByteBuffer expected =
ByteBuffer.wrap(("something to test" + i).getBytes());
assertTrue("failed to match metadata", actual.compareTo(expected) == 0);
}
}
@ -227,7 +242,7 @@ public class TestHFile extends HBaseTestCase {
fout.close();
Reader reader = new Reader(fs, mFile, null, false);
reader.loadFileInfo();
assertNull(reader.getMetaBlock("non-existant"));
assertNull(reader.getMetaBlock("non-existant", false));
}
/**
@ -244,7 +259,7 @@ public class TestHFile extends HBaseTestCase {
Path mFile = new Path(ROOT_DIR, "meta.tfile");
FSDataOutputStream fout = createFSOutput(mFile);
Writer writer = new Writer(fout, minBlockSize, (Compression.Algorithm) null,
new RawComparator<byte []>() {
new KeyComparator() {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
int l2) {

View File

@ -68,7 +68,8 @@ public class TestScanner extends HBaseTestCase {
TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY,
10, // Ten is arbitrary number. Keep versions to help debuggging.
Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL));
HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
HConstants.REPLICATION_SCOPE_LOCAL));
}
/** HRegionInfo for root region */
public static final HRegionInfo REGION_INFO =

View File

@ -134,8 +134,9 @@ public class TestStore extends TestCase {
long seqid = f.getMaxSequenceId();
HBaseConfiguration c = new HBaseConfiguration();
FileSystem fs = FileSystem.get(c);
Writer w = StoreFile.getWriter(fs, storedir);
StoreFile.appendMetadata(w, seqid + 1);
StoreFile.Writer w = StoreFile.createWriter(fs, storedir,
StoreFile.DEFAULT_BLOCKSIZE_SMALL);
w.appendMetadata(seqid + 1, false);
w.close();
this.store.close();
// Reopen it... should pick up two files

View File

@ -21,10 +21,13 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
@ -69,11 +72,11 @@ public class TestStoreFile extends HBaseTestCase {
*/
public void testBasicHalfMapFile() throws Exception {
// Make up a directory hierarchy that has a regiondir and familyname.
HFile.Writer writer = StoreFile.getWriter(this.fs,
new Path(new Path(this.testDir, "regionname"), "familyname"),
2 * 1024, null, null);
HFile.Writer writer = StoreFile.createWriter(this.fs,
new Path(new Path(this.testDir, "regionname"), "familyname"), 2 * 1024);
writeStoreFile(writer);
checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf, false));
checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf,
StoreFile.BloomType.NONE, false));
}
/*
@ -109,11 +112,11 @@ public class TestStoreFile extends HBaseTestCase {
Path storedir = new Path(new Path(this.testDir, "regionname"), "familyname");
Path dir = new Path(storedir, "1234567890");
// Make a store file and write data to it.
HFile.Writer writer = StoreFile.getWriter(this.fs, dir, 8 * 1024, null,
null);
HFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024);
writeStoreFile(writer);
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf, false);
HFile.Reader reader = hsf.getReader();
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
StoreFile.BloomType.NONE, false);
HFile.Reader reader = hsf.createReader();
// Split on a row, not in middle of row. Midkey returned by reader
// may be in middle of row. Create new one with empty column and
// timestamp.
@ -123,10 +126,11 @@ public class TestStoreFile extends HBaseTestCase {
byte [] finalRow = kv.getRow();
// Make a reference
Path refPath = StoreFile.split(fs, dir, hsf, midRow, Range.top);
StoreFile refHsf = new StoreFile(this.fs, refPath, true, conf, false);
StoreFile refHsf = new StoreFile(this.fs, refPath, true, conf,
StoreFile.BloomType.NONE, false);
// Now confirm that I can read from the reference and that it only gets
// keys from top half of the file.
HFileScanner s = refHsf.getReader().getScanner(false, false);
HFileScanner s = refHsf.createReader().getScanner(false, false);
for(boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) {
ByteBuffer bb = s.getKey();
kv = KeyValue.createKeyValueFromKey(bb);
@ -140,7 +144,7 @@ public class TestStoreFile extends HBaseTestCase {
private void checkHalfHFile(final StoreFile f)
throws IOException {
byte [] midkey = f.getReader().midkey();
byte [] midkey = f.createReader().midkey();
KeyValue midKV = KeyValue.createKeyValueFromKey(midkey);
byte [] midRow = midKV.getRow();
// Create top split.
@ -159,8 +163,10 @@ public class TestStoreFile extends HBaseTestCase {
Path bottomPath = StoreFile.split(this.fs, bottomDir,
f, midRow, Range.bottom);
// Make readers on top and bottom.
HFile.Reader top = new StoreFile(this.fs, topPath, true, conf, false).getReader();
HFile.Reader bottom = new StoreFile(this.fs, bottomPath, true, conf, false).getReader();
HFile.Reader top = new StoreFile(this.fs, topPath, true, conf,
StoreFile.BloomType.NONE, false).createReader();
HFile.Reader bottom = new StoreFile(this.fs, bottomPath, true, conf,
StoreFile.BloomType.NONE, false).createReader();
ByteBuffer previous = null;
LOG.info("Midkey: " + midKV.toString());
ByteBuffer bbMidkeyBytes = ByteBuffer.wrap(midkey);
@ -212,8 +218,10 @@ public class TestStoreFile extends HBaseTestCase {
topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top);
bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey,
Range.bottom);
top = new StoreFile(this.fs, topPath, true, conf, false).getReader();
bottom = new StoreFile(this.fs, bottomPath, true, conf, false).getReader();
top = new StoreFile(this.fs, topPath, true, conf,
StoreFile.BloomType.NONE, false).createReader();
bottom = new StoreFile(this.fs, bottomPath, true, conf,
StoreFile.BloomType.NONE, false).createReader();
bottomScanner = bottom.getScanner(false, false);
int count = 0;
while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
@ -256,8 +264,10 @@ public class TestStoreFile extends HBaseTestCase {
topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top);
bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey,
Range.bottom);
top = new StoreFile(this.fs, topPath, true, conf, false).getReader();
bottom = new StoreFile(this.fs, bottomPath, true, conf, false).getReader();
top = new StoreFile(this.fs, topPath, true, conf,
StoreFile.BloomType.NONE, false).createReader();
bottom = new StoreFile(this.fs, bottomPath, true, conf,
StoreFile.BloomType.NONE, false).createReader();
first = true;
bottomScanner = bottom.getScanner(false, false);
while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
@ -296,4 +306,138 @@ public class TestStoreFile extends HBaseTestCase {
fs.delete(f.getPath(), true);
}
}
}
private static String ROOT_DIR =
System.getProperty("test.build.data", "/tmp/TestStoreFile");
private static String localFormatter = "%010d";
public void testBloomFilter() throws Exception {
FileSystem fs = FileSystem.getLocal(conf);
conf.setFloat("io.hfile.bloom.error.rate", (float)0.01);
conf.setBoolean("io.hfile.bloom.enabled", true);
// write the file
Path f = new Path(ROOT_DIR, getName());
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000);
long now = System.currentTimeMillis();
for (int i = 0; i < 2000; i += 2) {
String row = String.format(localFormatter, Integer.valueOf(i));
KeyValue kv = new KeyValue(row.getBytes(), "family".getBytes(),
"col".getBytes(), now, "value".getBytes());
writer.append(kv);
}
writer.close();
StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false);
reader.loadFileInfo();
reader.loadBloomfilter();
HFileScanner scanner = reader.getScanner(false, false);
// check false positives rate
int falsePos = 0;
int falseNeg = 0;
for (int i = 0; i < 2000; i++) {
String row = String.format(localFormatter, Integer.valueOf(i));
TreeSet<byte[]> columns = new TreeSet<byte[]>();
columns.add("family:col".getBytes());
boolean exists = scanner.shouldSeek(row.getBytes(), columns);
if (i % 2 == 0) {
if (!exists) falseNeg++;
} else {
if (exists) falsePos++;
}
}
reader.close();
fs.delete(f, true);
System.out.println("False negatives: " + falseNeg);
assertEquals(0, falseNeg);
System.out.println("False positives: " + falsePos);
assertTrue(falsePos < 2);
}
public void testBloomTypes() throws Exception {
float err = (float) 0.01;
FileSystem fs = FileSystem.getLocal(conf);
conf.setFloat("io.hfile.bloom.error.rate", err);
conf.setBoolean("io.hfile.bloom.enabled", true);
int rowCount = 50;
int colCount = 10;
int versions = 2;
// run once using columns and once using rows
StoreFile.BloomType[] bt =
{StoreFile.BloomType.ROWCOL, StoreFile.BloomType.ROW};
int[] expKeys = {rowCount*colCount, rowCount};
// below line deserves commentary. it is expected bloom false positives
// column = rowCount*2*colCount inserts
// row-level = only rowCount*2 inserts, but failures will be magnified by
// 2nd for loop for every column (2*colCount)
float[] expErr = {2*rowCount*colCount*err, 2*rowCount*2*colCount*err};
for (int x : new int[]{0,1}) {
// write the file
Path f = new Path(ROOT_DIR, getName());
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
StoreFile.DEFAULT_BLOCKSIZE_SMALL,
HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, bt[x], expKeys[x]);
long now = System.currentTimeMillis();
for (int i = 0; i < rowCount*2; i += 2) { // rows
for (int j = 0; j < colCount*2; j += 2) { // column qualifiers
String row = String.format(localFormatter, Integer.valueOf(i));
String col = String.format(localFormatter, Integer.valueOf(j));
for (int k= 0; k < versions; ++k) { // versions
KeyValue kv = new KeyValue(row.getBytes(),
"family".getBytes(), ("col" + col).getBytes(),
now-k, Bytes.toBytes((long)-1));
writer.append(kv);
}
}
}
writer.close();
StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false);
reader.loadFileInfo();
reader.loadBloomfilter();
HFileScanner scanner = reader.getScanner(false, false);
assertEquals(expKeys[x], reader.getBloomFilter().getKeyCount());
// check false positives rate
int falsePos = 0;
int falseNeg = 0;
for (int i = 0; i < rowCount*2; ++i) { // rows
for (int j = 0; j < colCount*2; ++j) { // column qualifiers
String row = String.format(localFormatter, Integer.valueOf(i));
String col = String.format(localFormatter, Integer.valueOf(j));
TreeSet<byte[]> columns = new TreeSet<byte[]>();
columns.add(("col" + col).getBytes());
boolean exists = scanner.shouldSeek(row.getBytes(), columns);
boolean shouldRowExist = i % 2 == 0;
boolean shouldColExist = j % 2 == 0;
shouldColExist = shouldColExist || bt[x] == StoreFile.BloomType.ROW;
if (shouldRowExist && shouldColExist) {
if (!exists) falseNeg++;
} else {
if (exists) falsePos++;
}
}
}
reader.close();
fs.delete(f, true);
System.out.println(bt[x].toString());
System.out.println(" False negatives: " + falseNeg);
System.out.println(" False positives: " + falsePos);
assertEquals(0, falseNeg);
assertTrue(falsePos < 2*expErr[x]);
}
}
}

View File

@ -52,7 +52,8 @@ public class TestWideScanner extends HBaseTestCase {
TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY,
10, // Ten is arbitrary number. Keep versions to help debuggging.
Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
HConstants.FOREVER, false, HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
}
/** HRegionInfo for root region */
public static final HRegionInfo REGION_INFO =

View File

@ -33,7 +33,7 @@ public class TestColumnSchemaModel extends TestCase {
protected static final String COLUMN_NAME = "testcolumn";
protected static final boolean BLOCKCACHE = true;
protected static final int BLOCKSIZE = 16384;
protected static final boolean BLOOMFILTER = false;
protected static final String BLOOMFILTER = "none";
protected static final String COMPRESSION = "GZ";
protected static final boolean IN_MEMORY = false;
protected static final int TTL = 86400;
@ -42,7 +42,7 @@ public class TestColumnSchemaModel extends TestCase {
protected static final String AS_XML =
"<ColumnSchema name=\"testcolumn\"" +
" BLOCKSIZE=\"16384\"" +
" BLOOMFILTER=\"false\"" +
" BLOOMFILTER=\"none\"" +
" BLOCKCACHE=\"true\"" +
" COMPRESSION=\"GZ\"" +
" VERSIONS=\"1\"" +