HBASE-3857 Change the HFile Format (Mikhail & Liyin)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1153634 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-08-03 19:59:48 +00:00
parent e2f64664a6
commit 121c5b2e47
39 changed files with 1497 additions and 2444 deletions

View File

@ -395,7 +395,7 @@ Release 0.91.0 - Unreleased
HBASE-3691 Add compressor support for 'snappy', google's compressor
(Nichole Treadway and Nicholas Telford)
HBASE-2233 Support both Hadoop 0.20 and 0.22
HBASE-3857 Change the HFile Format (Mikhail & Liyin)
Release 0.90.4 - Unreleased

View File

@ -83,13 +83,29 @@ implements WritableComparable<HServerLoad> {
private int storefileSizeMB;
/** the current size of the memstore for the region, in MB */
private int memstoreSizeMB;
/** the current total size of storefile indexes for the region, in MB */
/**
* The current total size of root-level store file indexes for the region,
* in MB. The same as {@link #rootIndexSizeKB} but in MB.
*/
private int storefileIndexSizeMB;
/** the current total read requests made to region */
private int readRequestsCount;
/** the current total write requests made to region */
private int writeRequestsCount;
/** The current total size of root-level indexes for the region, in KB. */
private int rootIndexSizeKB;
/** The total size of all index blocks, not just the root level, in KB. */
private int totalStaticIndexSizeKB;
/**
* The total size of all Bloom filter blocks, not just loaded into the
* block cache, in KB.
*/
private int totalStaticBloomSizeKB;
/**
* Constructor, for Writable
*/
@ -111,6 +127,8 @@ implements WritableComparable<HServerLoad> {
final int storefiles, final int storeUncompressedSizeMB,
final int storefileSizeMB,
final int memstoreSizeMB, final int storefileIndexSizeMB,
final int rootIndexSizeKB, final int totalStaticIndexSizeKB,
final int totalStaticBloomSizeKB,
final int readRequestsCount, final int writeRequestsCount) {
this.name = name;
this.stores = stores;
@ -119,6 +137,9 @@ implements WritableComparable<HServerLoad> {
this.storefileSizeMB = storefileSizeMB;
this.memstoreSizeMB = memstoreSizeMB;
this.storefileIndexSizeMB = storefileIndexSizeMB;
this.rootIndexSizeKB = rootIndexSizeKB;
this.totalStaticIndexSizeKB = totalStaticIndexSizeKB;
this.totalStaticBloomSizeKB = totalStaticBloomSizeKB;
this.readRequestsCount = readRequestsCount;
this.writeRequestsCount = writeRequestsCount;
}
@ -263,6 +284,9 @@ implements WritableComparable<HServerLoad> {
this.storefileIndexSizeMB = in.readInt();
this.readRequestsCount = in.readInt();
this.writeRequestsCount = in.readInt();
this.rootIndexSizeKB = in.readInt();
this.totalStaticIndexSizeKB = in.readInt();
this.totalStaticBloomSizeKB = in.readInt();
}
public void write(DataOutput out) throws IOException {
@ -278,6 +302,9 @@ implements WritableComparable<HServerLoad> {
out.writeInt(storefileIndexSizeMB);
out.writeInt(readRequestsCount);
out.writeInt(writeRequestsCount);
out.writeInt(rootIndexSizeKB);
out.writeInt(totalStaticIndexSizeKB);
out.writeInt(totalStaticBloomSizeKB);
}
/**
@ -306,6 +333,12 @@ implements WritableComparable<HServerLoad> {
Long.valueOf(this.readRequestsCount));
sb = Strings.appendKeyValue(sb, "writeRequestsCount",
Long.valueOf(this.writeRequestsCount));
sb = Strings.appendKeyValue(sb, "rootIndexSizeKB",
Integer.valueOf(this.rootIndexSizeKB));
sb = Strings.appendKeyValue(sb, "totalStaticIndexSizeKB",
Integer.valueOf(this.totalStaticIndexSizeKB));
sb = Strings.appendKeyValue(sb, "totalStaticBloomSizeKB",
Integer.valueOf(this.totalStaticBloomSizeKB));
return sb.toString();
}
}

View File

@ -19,12 +19,15 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
/**
* Block cache interface.
* Block cache interface. Anything that implements the {@link HeapSize}
* interface can be put in the cache, because item size is all the cache
* cares about. We might move to a more specialized "cacheable" interface
* in the future.
*
* TODO: Add filename or hash of filename to block cache key.
*/
public interface BlockCache {
@ -34,14 +37,14 @@ public interface BlockCache {
* @param buf The block contents wrapped in a ByteBuffer.
* @param inMemory Whether block should be treated as in-memory
*/
public void cacheBlock(String blockName, ByteBuffer buf, boolean inMemory);
public void cacheBlock(String blockName, HeapSize buf, boolean inMemory);
/**
* Add block to cache (defaults to not in-memory).
* @param blockName Zero-based file block number.
* @param buf The block contents wrapped in a ByteBuffer.
*/
public void cacheBlock(String blockName, ByteBuffer buf);
public void cacheBlock(String blockName, HeapSize buf);
/**
* Fetch block from cache.
@ -49,7 +52,7 @@ public interface BlockCache {
* @param caching Whether this request has caching enabled (used for stats)
* @return Block or null if block is not in the cache.
*/
public ByteBuffer getBlock(String blockName, boolean caching);
public HeapSize getBlock(String blockName, boolean caching);
/**
* Evict block from cache.
@ -58,6 +61,17 @@ public interface BlockCache {
*/
public boolean evictBlock(String blockName);
/**
* Evicts all blocks with name starting with the given prefix. This is
* necessary in cases we need to evict all blocks that belong to a particular
* HFile. In HFile v2 all blocks consist of the storefile name (UUID), an
* underscore, and the block offset in the file. An efficient implementation
* would avoid scanning all blocks in the cache.
*
* @return the number of blocks evicted
*/
public int evictBlocksByPrefix(String string);
/**
* Get the statistics for this block cache.
* @return

View File

@ -55,22 +55,22 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock> {
};
private final String blockName;
private final ByteBuffer buf;
private final HeapSize buf;
private volatile long accessTime;
private long size;
private BlockPriority priority;
public CachedBlock(String blockName, ByteBuffer buf, long accessTime) {
public CachedBlock(String blockName, HeapSize buf, long accessTime) {
this(blockName, buf, accessTime, false);
}
public CachedBlock(String blockName, ByteBuffer buf, long accessTime,
public CachedBlock(String blockName, HeapSize buf, long accessTime,
boolean inMemory) {
this.blockName = blockName;
this.buf = buf;
this.accessTime = accessTime;
this.size = ClassSize.align(blockName.length()) +
ClassSize.align(buf.capacity()) + PER_BLOCK_OVERHEAD;
this.size = ClassSize.align(blockName.length())
+ ClassSize.align(buf.heapSize()) + PER_BLOCK_OVERHEAD;
if(inMemory) {
this.priority = BlockPriority.MEMORY;
} else {
@ -97,7 +97,7 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock> {
return this.accessTime < that.accessTime ? 1 : -1;
}
public ByteBuffer getBuffer() {
public HeapSize getBuffer() {
return this.buf;
}

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.io.hfile;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;
@ -251,7 +250,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
* @param buf block buffer
* @param inMemory if block is in-memory
*/
public void cacheBlock(String blockName, ByteBuffer buf, boolean inMemory) {
public void cacheBlock(String blockName, HeapSize buf, boolean inMemory) {
CachedBlock cb = map.get(blockName);
if(cb != null) {
throw new RuntimeException("Cached an already cached block");
@ -275,7 +274,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
* @param blockName block name
* @param buf block buffer
*/
public void cacheBlock(String blockName, ByteBuffer buf) {
public void cacheBlock(String blockName, HeapSize buf) {
cacheBlock(blockName, buf, false);
}
@ -284,7 +283,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
* @param blockName block name
* @return buffer of specified block name, or null if not in cache
*/
public ByteBuffer getBlock(String blockName, boolean caching) {
public HeapSize getBlock(String blockName, boolean caching) {
CachedBlock cb = map.get(blockName);
if(cb == null) {
stats.miss(caching);
@ -304,6 +303,31 @@ public class LruBlockCache implements BlockCache, HeapSize {
return true;
}
/**
* Evicts all blocks whose name starts with the given prefix. This is an
* expensive operation implemented as a linear-time search through all blocks
* in the cache. Ideally this should be a search in a log-access-time map.
*
* <p>
* This is used for evict-on-close to remove all blocks of a specific HFile.
* The prefix would be the HFile/StoreFile name (a UUID) followed by an
* underscore, because HFile v2 block names in cache are of the form
* "&lt;storeFileUUID&gt;_&lt;blockOffset&gt;".
*
* @return the number of blocks evicted
*/
@Override
public int evictBlocksByPrefix(String prefix) {
int numEvicted = 0;
for (String key : map.keySet()) {
if (key.startsWith(prefix)) {
if (evictBlock(key))
++numEvicted;
}
}
return numEvicted;
}
protected long evictBlock(CachedBlock block) {
map.remove(block.getName());
size.addAndGet(-1 * block.heapSize());

View File

@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
@ -32,10 +33,10 @@ import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
* Simple one RFile soft reference cache.
*/
public class SimpleBlockCache implements BlockCache {
private static class Ref extends SoftReference<ByteBuffer> {
private static class Ref extends SoftReference<HeapSize> {
public String blockId;
public Ref(String blockId, ByteBuffer buf, ReferenceQueue q) {
super(buf, q);
public Ref(String blockId, HeapSize block, ReferenceQueue q) {
super(block, q);
this.blockId = blockId;
}
}
@ -68,7 +69,7 @@ public class SimpleBlockCache implements BlockCache {
return cache.size();
}
public synchronized ByteBuffer getBlock(String blockName, boolean caching) {
public synchronized HeapSize getBlock(String blockName, boolean caching) {
processQueue(); // clear out some crap.
Ref ref = cache.get(blockName);
if (ref == null)
@ -76,13 +77,13 @@ public class SimpleBlockCache implements BlockCache {
return ref.get();
}
public synchronized void cacheBlock(String blockName, ByteBuffer buf) {
cache.put(blockName, new Ref(blockName, buf, q));
public synchronized void cacheBlock(String blockName, HeapSize block) {
cache.put(blockName, new Ref(blockName, block, q));
}
public synchronized void cacheBlock(String blockName, ByteBuffer buf,
public synchronized void cacheBlock(String blockName, HeapSize block,
boolean inMemory) {
cache.put(blockName, new Ref(blockName, buf, q));
cache.put(blockName, new Ref(blockName, block, q));
}
@Override
@ -117,4 +118,10 @@ public class SimpleBlockCache implements BlockCache {
// TODO: implement this if we ever actually use this block cache
return 0;
}
@Override
public int evictBlocksByPrefix(String string) {
throw new UnsupportedOperationException();
}
}

View File

@ -79,7 +79,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
// Get the path of the temporary output file
final Path outputPath = FileOutputFormat.getOutputPath(context);
final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
Configuration conf = context.getConfiguration();
final Configuration conf = context.getConfiguration();
final FileSystem fs = outputdir.getFileSystem(conf);
// These configs. are from hbase-*.xml
final long maxsize = conf.getLong("hbase.hregion.max.filesize",
@ -132,7 +132,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
// create a new HLog writer, if necessary
if (wl == null || wl.writer == null) {
wl = getNewWriter(family);
wl = getNewWriter(family, conf);
}
// we now have the proper HLog writer. full steam ahead
@ -163,12 +163,13 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
* @return A WriterLength, containing a new HFile.Writer.
* @throws IOException
*/
private WriterLength getNewWriter(byte[] family) throws IOException {
private WriterLength getNewWriter(byte[] family, Configuration conf)
throws IOException {
WriterLength wl = new WriterLength();
Path familydir = new Path(outputdir, Bytes.toString(family));
String compression = compressionMap.get(family);
compression = compression == null ? defaultCompression : compression;
wl.writer = new HFile.Writer(fs,
wl.writer = HFile.getWriterFactory(conf).createWriter(fs,
StoreFile.getUniqueFile(fs, familydir), blocksize,
compression, KeyValue.KEY_COMPARATOR);
this.writers.put(family, wl);

View File

@ -288,7 +288,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throws IOException {
final Path hfilePath = item.hfilePath;
final FileSystem fs = hfilePath.getFileSystem(getConf());
HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false, false);
HFile.Reader hfr = HFile.createReader(fs, hfilePath, null, false, false);
final byte[] first, last;
try {
hfr.loadFileInfo();
@ -390,7 +390,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
halfWriter = new StoreFile.Writer(
fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR,
bloomFilterType, 0, false);
bloomFilterType, 0);
HFileScanner scanner = halfReader.getScanner(false, false);
scanner.seekTo();
do {
@ -490,7 +490,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
for (Path hfile : hfiles) {
if (hfile.getName().startsWith("_")) continue;
HFile.Reader reader = new HFile.Reader(fs, hfile, null, false, false);
HFile.Reader reader = HFile.createReader(fs, hfile, null, false, false);
final byte[] first, last;
try {
reader.loadFileInfo();

View File

@ -926,6 +926,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
int storefileSizeMB = 0;
int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
int storefileIndexSizeMB = 0;
int rootIndexSizeKB = 0;
int totalStaticIndexSizeKB = 0;
int totalStaticBloomSizeKB = 0;
synchronized (r.stores) {
stores += r.stores.size();
for (Store store : r.stores.values()) {
@ -934,11 +937,21 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
/ 1024 / 1024);
storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
rootIndexSizeKB +=
(int) (store.getStorefilesIndexSize() / 1024);
totalStaticIndexSizeKB +=
(int) (store.getTotalStaticIndexSize() / 1024);
totalStaticBloomSizeKB +=
(int) (store.getTotalStaticBloomSize() / 1024);
}
}
return new HServerLoad.RegionLoad(name,stores, storefiles,
return new HServerLoad.RegionLoad(name, stores, storefiles,
storeUncompressedSizeMB,
storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB,
storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
totalStaticIndexSizeKB, totalStaticBloomSizeKB,
(int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get());
}
@ -1197,6 +1210,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
int readRequestsCount = 0;
int writeRequestsCount = 0;
long storefileIndexSize = 0;
long totalStaticIndexSize = 0;
long totalStaticBloomSize = 0;
for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
HRegion r = e.getValue();
memstoreSize += r.memstoreSize.get();
@ -1208,20 +1223,24 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
Store store = ee.getValue();
storefiles += store.getStorefilesCount();
storefileIndexSize += store.getStorefilesIndexSize();
totalStaticIndexSize += store.getTotalStaticIndexSize();
totalStaticBloomSize += store.getTotalStaticBloomSize();
}
}
}
this.metrics.stores.set(stores);
this.metrics.storefiles.set(storefiles);
this.metrics.memstoreSizeMB.set((int) (memstoreSize / (1024 * 1024)));
this.metrics.storefileIndexSizeMB.set(
(int) (storefileIndexSize / (1024 * 1024)));
this.metrics.rootIndexSizeKB.set(
(int) (storefileIndexSize / 1024));
this.metrics.totalStaticIndexSizeKB.set(
(int) (totalStaticIndexSize / 1024));
this.metrics.totalStaticBloomSizeKB.set(
(int) (totalStaticBloomSize / 1024));
this.metrics.readRequestsCount.set(readRequestsCount);
this.metrics.writeRequestsCount.set(writeRequestsCount);
this.metrics.storefileIndexSizeMB
.set((int) (storefileIndexSize / (1024 * 1024)));
this.metrics.compactionQueueSize.set(compactSplitThread
.getCompactionQueueSize());
this.metrics.flushQueueSize.set(cacheFlusher
.getFlushQueueSize());
BlockCache blockCache = StoreFile.getBlockCache(conf);
if (blockCache != null) {

View File

@ -24,10 +24,8 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -59,7 +57,6 @@ import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
* A Store holds a column family in a Region. Its a memstore and a set of zero
@ -342,7 +339,7 @@ public class Store implements HeapSize {
try {
LOG.info("Validating hfile at " + srcPath + " for inclusion in "
+ "store " + this + " region " + this.region);
reader = new HFile.Reader(srcPath.getFileSystem(conf),
reader = HFile.createReader(srcPath.getFileSystem(conf),
srcPath, null, false, false);
reader.loadFileInfo();
@ -556,8 +553,7 @@ public class Store implements HeapSize {
throws IOException {
return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize,
compression, this.comparator, this.conf,
this.family.getBloomFilterType(), maxKeyCount,
conf.getBoolean("hbase.rs.cacheblocksonwrite", false));
this.family.getBloomFilterType(), maxKeyCount);
}
/*
@ -1574,6 +1570,37 @@ public class Store implements HeapSize {
return size;
}
/**
* Returns the total size of all index blocks in the data block indexes,
* including the root level, intermediate levels, and the leaf level for
* multi-level indexes, or just the root level for single-level indexes.
*
* @return the total size of block indexes in the store
*/
long getTotalStaticIndexSize() {
long size = 0;
for (StoreFile s : storefiles) {
size += s.getReader().getUncompressedDataIndexSize();
}
return size;
}
/**
* Returns the total byte size of all Bloom filter bit arrays. For compound
* Bloom filters even the Bloom blocks currently not loaded into the block
* cache are counted.
*
* @return the total size of all Bloom filters in the store
*/
long getTotalStaticBloomSize() {
long size = 0;
for (StoreFile s : storefiles) {
StoreFile.Reader r = s.getReader();
size += r.getTotalBloomSize();
}
return size;
}
/**
* @return The priority that this store should have in the compaction queue
*/

View File

@ -19,12 +19,12 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.DataInput;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.nio.ByteBuffer;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -52,11 +52,12 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV1;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.ByteBloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Hash;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableUtils;
@ -69,10 +70,10 @@ import com.google.common.collect.Ordering;
/**
* A Store data file. Stores usually have one or more of these files. They
* are produced by flushing the memstore to disk. To
* create, call {@link #createWriter(FileSystem, Path, int)} and append data. Be
* sure to add any metadata before calling close on the Writer
* (Use the appendMetadata convenience methods). On close, a StoreFile is
* sitting in the Filesystem. To refer to it, create a StoreFile instance
* create, call {@link #createWriter(FileSystem, Path, int, Configuration)}
* and append data. Be sure to add any metadata before calling close on the
* Writer (Use the appendMetadata convenience methods). On close, a StoreFile
* is sitting in the Filesystem. To refer to it, create a StoreFile instance
* passing filesystem and path. To read, call {@link #createReader()}.
* <p>StoreFiles may also reference store files in another Store.
*
@ -82,11 +83,6 @@ import com.google.common.collect.Ordering;
public class StoreFile {
static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
// Config keys.
static final String IO_STOREFILE_BLOOM_ERROR_RATE = "io.storefile.bloom.error.rate";
static final String IO_STOREFILE_BLOOM_MAX_FOLD = "io.storefile.bloom.max.fold";
static final String IO_STOREFILE_BLOOM_MAX_KEYS = "io.storefile.bloom.max.keys";
static final String IO_STOREFILE_BLOOM_ENABLED = "io.storefile.bloom.enabled";
static final String HFILE_BLOCK_CACHE_SIZE_KEY = "hfile.block.cache.size";
public static enum BloomType {
@ -103,21 +99,26 @@ public class StoreFile {
*/
ROWCOL
}
// Keys for fileinfo values in HFile
/** Max Sequence ID in FileInfo */
public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
/** Major compaction flag in FileInfo */
public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
public static final byte[] MAJOR_COMPACTION_KEY =
Bytes.toBytes("MAJOR_COMPACTION_KEY");
/** Bloom filter Type in FileInfo */
static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
static final byte[] BLOOM_FILTER_TYPE_KEY =
Bytes.toBytes("BLOOM_FILTER_TYPE");
/** Last Bloom filter key in FileInfo */
private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
/** Key for Timerange information in metadata*/
public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
/** Meta data block name for bloom filter meta-info (ie: bloom params/specs) */
static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
/** Meta data block name for bloom filter data (ie: bloom bits) */
static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
// Make default block size for StoreFiles 8k while testing. TODO: FIX!
// Need to make it 8k for testing.
public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
@ -126,14 +127,19 @@ public class StoreFile {
private static BlockCache hfileBlockCache = null;
private final FileSystem fs;
// This file's path.
private final Path path;
// If this storefile references another, this is the reference instance.
private Reference reference;
// If this StoreFile references another, this is the other files path.
private Path referencePath;
// Should the block cache be used or not.
private boolean blockcache;
// Is this from an in-memory store
private boolean inMemory;
@ -204,11 +210,12 @@ public class StoreFile {
}
// ignore if the column family config says "no bloom filter"
// even if there is one in the hfile.
if (conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true)) {
if (BloomFilterFactory.isBloomEnabled(conf)) {
this.bloomType = bt;
} else {
LOG.info("Ignoring bloom filter check for file " + path + ": " +
"bloomType=" + bt + " (disabled in config)");
this.bloomType = BloomType.NONE;
LOG.info("Ignoring bloom filter check for file (disabled in config)");
}
// cache the modification time stamp of this store file
@ -393,7 +400,7 @@ public class StoreFile {
} else {
this.reader = new Reader(this.fs, this.path, getBlockCache(),
this.inMemory,
this.conf.getBoolean("hbase.rs.evictblocksonclose", true));
this.conf.getBoolean(HFile.EVICT_BLOCKS_ON_CLOSE_KEY, true));
}
// Load up indices and fileinfo.
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
@ -541,13 +548,10 @@ public class StoreFile {
* @return StoreFile.Writer
* @throws IOException
*/
public static Writer createWriter(final FileSystem fs,
final Path dir,
final int blocksize)
throws IOException {
return createWriter(fs, dir, blocksize, null, null, null, BloomType.NONE, 0,
false);
public static Writer createWriter(final FileSystem fs, final Path dir,
final int blocksize, Configuration conf) throws IOException {
return createWriter(fs, dir, blocksize, null, null, conf, BloomType.NONE,
0);
}
/**
@ -558,10 +562,10 @@ public class StoreFile {
* Creates a file with a unique name in this directory.
* @param blocksize
* @param algorithm Pass null to get default.
* @param c Pass null to get default.
* @param conf HBase system configuration. used with bloom filters
* @param bloomType column family setting for bloom filters
* @param c Pass null to get default.
* @param maxKeySize peak theoretical entry size (maintains error rate)
* @param maxKeyCount estimated maximum number of keys we expect to add
* @return HFile.Writer
* @throws IOException
*/
@ -572,22 +576,20 @@ public class StoreFile {
final KeyValue.KVComparator c,
final Configuration conf,
BloomType bloomType,
int maxKeySize,
final boolean cacheOnWrite)
long maxKeyCount)
throws IOException {
if (!fs.exists(dir)) {
fs.mkdirs(dir);
}
Path path = getUniqueFile(fs, dir);
if(conf == null || !conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true)) {
if (!BloomFilterFactory.isBloomEnabled(conf)) {
bloomType = BloomType.NONE;
}
return new Writer(fs, path, blocksize,
algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize,
cacheOnWrite);
conf, c == null ? KeyValue.COMPARATOR: c, bloomType, maxKeyCount);
}
/**
@ -677,11 +679,13 @@ public class StoreFile {
* local because it is an implementation detail of the HBase regionserver.
*/
public static class Writer {
private final BloomFilter bloomFilter;
private final BloomFilterWriter bloomFilterWriter;
private final BloomType bloomType;
private byte[] lastBloomKey;
private int lastBloomKeyOffset, lastBloomKeyLen;
private KVComparator kvComparator;
private KeyValue lastKv = null;
private byte[] lastByteArray = null;
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
* When flushing a memstore, we set TimeRange and use this variable to
@ -701,59 +705,30 @@ public class StoreFile {
* @param conf user configuration
* @param comparator key comparator
* @param bloomType bloom filter setting
* @param maxKeys maximum amount of keys to add (for blooms)
* @param cacheOnWrite whether to cache blocks as we write file
* @param maxKeys the expected maximum number of keys to be added. Was used
* for Bloom filter size in {@link HFile} format version 1.
* @throws IOException problem writing to FS
*/
public Writer(FileSystem fs, Path path, int blocksize,
Compression.Algorithm compress, final Configuration conf,
final KVComparator comparator, BloomType bloomType, int maxKeys,
boolean cacheOnWrite)
final KVComparator comparator, BloomType bloomType, long maxKeys)
throws IOException {
writer = new HFile.Writer(fs, path, blocksize, compress,
comparator.getRawComparator(),
cacheOnWrite ? StoreFile.getBlockCache(conf) : null);
writer = HFile.getWriterFactory(conf).createWriter(
fs, path, blocksize,
compress, comparator.getRawComparator());
this.kvComparator = comparator;
BloomFilter bloom = null;
BloomType bt = BloomType.NONE;
if (bloomType != BloomType.NONE && conf != null) {
float err = conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float)0.01);
// Since in row+col blooms we have 2 calls to shouldSeek() instead of 1
// and the false positives are adding up, we should keep the error rate
// twice as low in order to maintain the number of false positives as
// desired by the user
if (bloomType == BloomType.ROWCOL) {
err /= 2;
}
int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD, 7);
int tooBig = conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS, 128*1000*1000);
if (maxKeys < tooBig) {
try {
bloom = new ByteBloomFilter(maxKeys, err,
Hash.getHashType(conf), maxFold);
bloom.allocBloom();
bt = bloomType;
} catch (IllegalArgumentException iae) {
LOG.warn(String.format(
"Parse error while creating bloom for %s (%d, %f)",
path, maxKeys, err), iae);
bloom = null;
bt = BloomType.NONE;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping bloom filter because max keysize too large: "
+ maxKeys);
}
}
bloomFilterWriter = BloomFilterFactory.createBloomAtWrite(conf,
bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
if (bloomFilterWriter != null) {
this.bloomType = bloomType;
LOG.info("Bloom filter type for " + path + ": " + this.bloomType +
", "+ bloomFilterWriter.getClass().getSimpleName());
} else {
// Not using Bloom filters.
this.bloomType = BloomType.NONE;
}
this.bloomFilter = bloom;
this.bloomType = bt;
}
/**
@ -812,7 +787,7 @@ public class StoreFile {
}
public void append(final KeyValue kv) throws IOException {
if (this.bloomFilter != null) {
if (this.bloomFilterWriter != null) {
// only add to the bloom filter on a new, unique key
boolean newKey = true;
if (this.lastKv != null) {
@ -836,24 +811,42 @@ public class StoreFile {
* 1. Row = Row
* 2. RowCol = Row + Qualifier
*/
byte[] bloomKey;
int bloomKeyOffset, bloomKeyLen;
switch (bloomType) {
case ROW:
this.bloomFilter.add(kv.getBuffer(), kv.getRowOffset(),
kv.getRowLength());
bloomKey = kv.getBuffer();
bloomKeyOffset = kv.getRowOffset();
bloomKeyLen = kv.getRowLength();
break;
case ROWCOL:
// merge(row, qualifier)
int ro = kv.getRowOffset();
int rl = kv.getRowLength();
int qo = kv.getQualifierOffset();
int ql = kv.getQualifierLength();
byte [] result = new byte[rl + ql];
System.arraycopy(kv.getBuffer(), ro, result, 0, rl);
System.arraycopy(kv.getBuffer(), qo, result, rl, ql);
this.bloomFilter.add(result);
// TODO: could save one buffer copy in case of compound Bloom
// filters when this involves creating a KeyValue
bloomKey = bloomFilterWriter.createBloomKey(kv.getBuffer(),
kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
kv.getQualifierOffset(), kv.getQualifierLength());
bloomKeyOffset = 0;
bloomKeyLen = bloomKey.length;
break;
default:
throw new IOException("Invalid Bloom filter type: " + bloomType);
}
bloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
if (lastBloomKey != null
&& bloomFilterWriter.getComparator().compare(bloomKey,
bloomKeyOffset, bloomKeyLen, lastBloomKey,
lastBloomKeyOffset, lastBloomKeyLen) <= 0) {
throw new IOException("Non-increasing Bloom keys: "
+ Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen)
+ " after "
+ Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset,
lastBloomKeyLen));
}
lastBloomKey = bloomKey;
lastBloomKeyOffset = bloomKeyOffset;
lastBloomKeyLen = bloomKeyLen;
this.lastKv = kv;
}
}
@ -866,39 +859,41 @@ public class StoreFile {
}
boolean hasBloom() {
return this.bloomFilter != null;
return this.bloomFilterWriter != null;
}
public void append(final byte [] key, final byte [] value) throws IOException {
if (this.bloomFilter != null) {
// only add to the bloom filter on a new row
if (this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) {
this.bloomFilter.add(key);
this.lastByteArray = key;
}
}
writer.append(key, value);
includeInTimeRangeTracker(key);
/**
* For unit testing only.
* @return the Bloom filter used by this writer.
*/
BloomFilterWriter getBloomWriter() {
return bloomFilterWriter;
}
public void close() throws IOException {
// make sure we wrote something to the bloom before adding it
if (this.bloomFilter != null && this.bloomFilter.getKeyCount() > 0) {
bloomFilter.compactBloom();
if (this.bloomFilter.getMaxKeys() > 0) {
int b = this.bloomFilter.getByteSize();
int k = this.bloomFilter.getKeyCount();
int m = this.bloomFilter.getMaxKeys();
StoreFile.LOG.info("Bloom added to HFile (" +
getPath() + "): " + StringUtils.humanReadableInt(b) + ", " +
k + "/" + m + " (" + NumberFormat.getPercentInstance().format(
((double)k) / ((double)m)) + ")");
// Make sure we wrote something to the Bloom filter before adding it.
boolean haveBloom = bloomFilterWriter != null &&
bloomFilterWriter.getKeyCount() > 0;
if (haveBloom) {
bloomFilterWriter.compactBloom();
writer.addBloomFilter(bloomFilterWriter);
writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY,
Bytes.toBytes(bloomType.toString()));
if (lastBloomKey != null) {
writer.appendFileInfo(LAST_BLOOM_KEY, Arrays.copyOfRange(
lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
+ lastBloomKeyLen));
}
writer.appendMetaBlock(BLOOM_FILTER_META_KEY, bloomFilter.getMetaWriter());
writer.appendMetaBlock(BLOOM_FILTER_DATA_KEY, bloomFilter.getDataWriter());
writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
}
writer.close();
// Log final Bloom filter statistics. This needs to be done after close()
// because compound Bloom filters might be finalized as part of closing.
if (haveBloom && bloomFilterWriter.getMaxKeys() > 0) {
StoreFile.LOG.info("Bloom added to HFile ("
+ getPath() + "): " +
bloomFilterWriter.toString().replace("\n", "; "));
}
}
public void appendFileInfo(byte[] key, byte[] value) throws IOException {
@ -917,11 +912,12 @@ public class StoreFile {
private final HFile.Reader reader;
protected TimeRangeTracker timeRangeTracker = null;
protected long sequenceID = -1;
private byte[] lastBloomKey;
public Reader(FileSystem fs, Path path, BlockCache blockCache,
boolean inMemory, boolean evictOnClose)
throws IOException {
reader = new HFile.Reader(fs, path, blockCache, inMemory, evictOnClose);
reader = HFile.createReader(fs, path, blockCache, inMemory, evictOnClose);
bloomFilterType = BloomType.NONE;
}
@ -966,7 +962,7 @@ public class StoreFile {
}
public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
return (passesTimerangeFilter(scan) && passesBloomFilter(scan,columns));
return (passesTimerangeFilter(scan) && passesBloomFilter(scan, columns));
}
/**
@ -982,42 +978,82 @@ public class StoreFile {
}
}
private boolean passesBloomFilter(Scan scan, final SortedSet<byte[]> columns) {
BloomFilter bm = this.bloomFilter;
if (bm == null || !scan.isGetScan()) {
private boolean passesBloomFilter(Scan scan,
final SortedSet<byte[]> columns) {
BloomFilter bloomFilter = this.bloomFilter;
if (bloomFilter == null) {
return true;
}
// Empty file?
if (reader.getTrailer().getEntryCount() == 0)
return false;
byte[] row = scan.getStartRow();
byte[] key;
switch (this.bloomFilterType) {
case ROW:
key = row;
break;
case ROWCOL:
if (columns != null && columns.size() == 1) {
byte[] col = columns.first();
key = Bytes.add(row, col);
byte[] column = columns.first();
key = bloomFilter.createBloomKey(row, 0, row.length,
column, 0, column.length);
break;
}
//$FALL-THROUGH$
return true;
default:
return true;
}
try {
ByteBuffer bloom = reader.getMetaBlock(BLOOM_FILTER_DATA_KEY, true);
if (bloom != null) {
if (this.bloomFilterType == BloomType.ROWCOL) {
boolean shouldCheckBloom;
ByteBuffer bloom;
if (bloomFilter.supportsAutoLoading()) {
bloom = null;
shouldCheckBloom = true;
} else {
bloom = reader.getMetaBlock(HFileWriterV1.BLOOM_FILTER_DATA_KEY,
true);
shouldCheckBloom = bloom != null;
}
if (shouldCheckBloom) {
boolean exists;
// Whether the primary Bloom key is greater than the last Bloom key
// from the file info. For row-column Bloom filters this is not yet
// a sufficient condition to return false.
boolean keyIsAfterLast = lastBloomKey != null
&& bloomFilter.getComparator().compare(key, lastBloomKey) > 0;
if (bloomFilterType == BloomType.ROWCOL) {
// Since a Row Delete is essentially a DeleteFamily applied to all
// columns, a file might be skipped if using row+col Bloom filter.
// In order to ensure this file is included an additional check is
// required looking only for a row bloom.
return bm.contains(key, bloom) ||
bm.contains(row, bloom);
}
else {
return bm.contains(key, bloom);
byte[] rowBloomKey = bloomFilter.createBloomKey(row, 0, row.length,
null, 0, 0);
if (keyIsAfterLast
&& bloomFilter.getComparator().compare(rowBloomKey,
lastBloomKey) > 0) {
exists = false;
} else {
exists =
this.bloomFilter.contains(key, 0, key.length, bloom) ||
this.bloomFilter.contains(rowBloomKey, 0, rowBloomKey.length,
bloom);
}
} else {
exists = !keyIsAfterLast
&& this.bloomFilter.contains(key, 0, key.length, bloom);
}
return exists;
}
} catch (IOException e) {
LOG.error("Error reading bloom filter data -- proceeding without",
@ -1039,6 +1075,8 @@ public class StoreFile {
bloomFilterType = BloomType.valueOf(Bytes.toString(b));
}
lastBloomKey = fi.get(LAST_BLOOM_KEY);
return fi;
}
@ -1048,16 +1086,17 @@ public class StoreFile {
}
try {
ByteBuffer b = reader.getMetaBlock(BLOOM_FILTER_META_KEY, false);
if (b != null) {
DataInput bloomMeta = reader.getBloomFilterMetadata();
if (bloomMeta != null) {
if (bloomFilterType == BloomType.NONE) {
throw new IOException("valid bloom filter type not found in FileInfo");
throw new IOException(
"valid bloom filter type not found in FileInfo");
}
this.bloomFilter = new ByteBloomFilter(b);
LOG.info("Loaded " + (bloomFilterType== BloomType.ROW? "row":"col")
+ " bloom filter metadata for " + reader.getName());
bloomFilter = BloomFilterFactory.createFromMeta(bloomMeta, reader);
LOG.info("Loaded " + bloomFilterType + " " +
bloomFilter.getClass().getSimpleName() + " metadata for " +
reader.getName());
}
} catch (IOException e) {
LOG.error("Error reading bloom filter meta -- proceeding without", e);
@ -1068,13 +1107,16 @@ public class StoreFile {
}
}
public int getFilterEntries() {
return (this.bloomFilter != null) ? this.bloomFilter.getKeyCount()
: reader.getFilterEntries();
}
public ByteBuffer getMetaBlock(String bloomFilterDataKey, boolean cacheBlock) throws IOException {
return reader.getMetaBlock(bloomFilterDataKey, cacheBlock);
/**
* The number of Bloom filter entries in this store file, or an estimate
* thereof, if the Bloom filter is not loaded. This always returns an upper
* bound of the number of Bloom filter entries.
*
* @return an estimate of the number of Bloom filter entries in this file
*/
public long getFilterEntries() {
return bloomFilter != null ? bloomFilter.getKeyCount()
: reader.getEntries();
}
public void setBloomFilterFaulty() {
@ -1094,10 +1136,10 @@ public class StoreFile {
}
public long getTotalUncompressedBytes() {
return reader.getTotalUncompressedBytes();
return reader.getTrailer().getTotalUncompressedBytes();
}
public int getEntries() {
public long getEntries() {
return reader.getEntries();
}
@ -1120,6 +1162,28 @@ public class StoreFile {
public void setSequenceID(long sequenceID) {
this.sequenceID = sequenceID;
}
BloomFilter getBloomFilter() {
return bloomFilter;
}
long getUncompressedDataIndexSize() {
return reader.getTrailer().getUncompressedDataIndexSize();
}
public long getTotalBloomSize() {
if (bloomFilter == null)
return 0;
return bloomFilter.getByteSize();
}
public int getHFileVersion() {
return reader.getTrailer().getVersion();
}
HFile.Reader getHFileReader() {
return reader;
}
}
/**
@ -1171,4 +1235,5 @@ public class StoreFile {
}
});
}
}

View File

@ -138,11 +138,22 @@ public class RegionServerMetrics implements Updater {
public final MetricsLongValue writeRequestsCount = new MetricsLongValue("writeRequestsCount", registry);
/**
* Sum of all the storefile index sizes in this regionserver in MB
*/
public final MetricsIntValue storefileIndexSizeMB =
new MetricsIntValue("storefileIndexSizeMB", registry);
/** The total size of block index root levels in this regionserver in KB. */
public final MetricsIntValue rootIndexSizeKB =
new MetricsIntValue("rootIndexSizeKB", registry);
/** Total size of all block indexes (not necessarily loaded in memory) */
public final MetricsIntValue totalStaticIndexSizeKB =
new MetricsIntValue("totalStaticIndexSizeKB", registry);
/** Total size of all Bloom filters (not necessarily loaded in memory) */
public final MetricsIntValue totalStaticBloomSizeKB =
new MetricsIntValue("totalStaticBloomSizeKB", registry);
/**
* Sum of all the memstore sizes in this regionserver in MB
*/
@ -252,6 +263,9 @@ public class RegionServerMetrics implements Updater {
this.stores.pushMetric(this.metricsRecord);
this.storefiles.pushMetric(this.metricsRecord);
this.storefileIndexSizeMB.pushMetric(this.metricsRecord);
this.rootIndexSizeKB.pushMetric(this.metricsRecord);
this.totalStaticIndexSizeKB.pushMetric(this.metricsRecord);
this.totalStaticBloomSizeKB.pushMetric(this.metricsRecord);
this.memstoreSizeMB.pushMetric(this.metricsRecord);
this.readRequestsCount.pushMetric(this.metricsRecord);
this.writeRequestsCount.pushMetric(this.metricsRecord);
@ -278,9 +292,9 @@ public class RegionServerMetrics implements Updater {
// }
// Means you can't pass a numOps of zero or get a ArithmeticException / by zero.
int ops = (int)HFile.getReadOps();
if (ops != 0) this.fsReadLatency.inc(ops, HFile.getReadTime());
if (ops != 0) this.fsReadLatency.inc(ops, HFile.getReadTimeMs());
ops = (int)HFile.getWriteOps();
if (ops != 0) this.fsWriteLatency.inc(ops, HFile.getWriteTime());
if (ops != 0) this.fsWriteLatency.inc(ops, HFile.getWriteTimeMs());
// mix in HLog metrics
ops = (int)HLog.getWriteOps();
if (ops != 0) this.fsWriteLatency.inc(ops, HLog.getWriteTime());
@ -356,6 +370,12 @@ public class RegionServerMetrics implements Updater {
Integer.valueOf(this.storefiles.get()));
sb = Strings.appendKeyValue(sb, "storefileIndexSize",
Integer.valueOf(this.storefileIndexSizeMB.get()));
sb = Strings.appendKeyValue(sb, "rootIndexSizeKB",
Integer.valueOf(this.rootIndexSizeKB.get()));
sb = Strings.appendKeyValue(sb, "totalStaticIndexSizeKB",
Integer.valueOf(this.totalStaticIndexSizeKB.get()));
sb = Strings.appendKeyValue(sb, "totalStaticBloomSizeKB",
Integer.valueOf(this.totalStaticBloomSizeKB.get()));
sb = Strings.appendKeyValue(sb, "memstoreSize",
Integer.valueOf(this.memstoreSizeMB.get()));
sb = Strings.appendKeyValue(sb, "readRequestsCount",

View File

@ -19,61 +19,28 @@
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.io.Writable;
import java.nio.ByteBuffer;
/**
* Defines the general behavior of a bloom filter.
* <p>
* The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by
* the networking research community in the past decade thanks to the bandwidth efficiencies that it
* offers for the transmission of set membership information between networked hosts. A sender encodes
* the information into a bit vector, the Bloom filter, that is more compact than a conventional
* representation. Computation and space costs for construction are linear in the number of elements.
* The receiver uses the filter to test whether various elements are members of the set. Though the
* filter will occasionally return a false positive, it will never return a false negative. When creating
* the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size.
*
* <p>
* Originally created by
* <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
* The Bloom filter is a data structure that was introduced in 1970 and that
* has been adopted by the networking research community in the past decade
* thanks to the bandwidth efficiencies that it offers for the transmission of
* set membership information between networked hosts. A sender encodes the
* information into a bit vector, the Bloom filter, that is more compact than a
* conventional representation. Computation and space costs for construction
* are linear in the number of elements. The receiver uses the filter to test
* whether various elements are members of the set. Though the filter will
* occasionally return a false positive, it will never return a false negative.
* When creating the filter, the sender can choose its desired point in a
* trade-off between the false positive rate and the size.
*
* <p>
* It must be extended in order to define the real behavior.
* @see {@link BloomFilterWriter} for the ability to add elements to a Bloom
* filter
*/
public interface BloomFilter {
/**
* Allocate memory for the bloom filter data. Note that bloom data isn't
* allocated by default because it can grow large & reads would be better
* managed by the LRU cache.
*/
void allocBloom();
/**
* Add the specified binary to the bloom filter.
*
* @param buf data to be added to the bloom
*/
void add(byte []buf);
/**
* Add the specified binary to the bloom filter.
*
* @param buf data to be added to the bloom
* @param offset offset into the data to be added
* @param len length of the data to be added
*/
void add(byte []buf, int offset, int len);
/**
* Check if the specified key is contained in the bloom filter.
*
* @param buf data to check for existence of
* @param bloom bloom filter data to search
* @return true if matched by bloom, false if not
*/
boolean contains(byte [] buf, ByteBuffer bloom);
public interface BloomFilter extends BloomFilterBase {
/**
* Check if the specified key is contained in the bloom filter.
@ -81,41 +48,16 @@ public interface BloomFilter {
* @param buf data to check for existence of
* @param offset offset into the data
* @param length length of the data
* @param bloom bloom filter data to search
* @param bloom bloom filter data to search. This can be null if auto-loading
* is supported.
* @return true if matched by bloom, false if not
*/
boolean contains(byte [] buf, int offset, int length, ByteBuffer bloom);
/**
* @return The number of keys added to the bloom
* @return true if this Bloom filter can automatically load its data
* and thus allows a null byte buffer to be passed to contains()
*/
int getKeyCount();
boolean supportsAutoLoading();
/**
* @return The max number of keys that can be inserted
* to maintain the desired error rate
*/
public int getMaxKeys();
/**
* @return Size of the bloom, in bytes
*/
public int getByteSize();
/**
* Compact the bloom before writing metadata & data to disk
*/
void compactBloom();
/**
* Get a writable interface into bloom filter meta data.
* @return writable class
*/
Writable getMetaWriter();
/**
* Get a writable interface into bloom filter data (actual bloom).
* @return writable class
*/
Writable getDataWriter();
}

View File

@ -20,41 +20,71 @@
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.NumberFormat;
import java.util.Random;
/**
* Implements a <i>Bloom filter</i>, as defined by Bloom in 1970.
* <p>
* The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by
* the networking research community in the past decade thanks to the bandwidth efficiencies that it
* offers for the transmission of set membership information between networked hosts. A sender encodes
* the information into a bit vector, the Bloom filter, that is more compact than a conventional
* representation. Computation and space costs for construction are linear in the number of elements.
* The receiver uses the filter to test whether various elements are members of the set. Though the
* filter will occasionally return a false positive, it will never return a false negative. When creating
* the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size.
* The Bloom filter is a data structure that was introduced in 1970 and that has
* been adopted by the networking research community in the past decade thanks
* to the bandwidth efficiencies that it offers for the transmission of set
* membership information between networked hosts. A sender encodes the
* information into a bit vector, the Bloom filter, that is more compact than a
* conventional representation. Computation and space costs for construction are
* linear in the number of elements. The receiver uses the filter to test
* whether various elements are members of the set. Though the filter will
* occasionally return a false positive, it will never return a false negative.
* When creating the filter, the sender can choose its desired point in a
* trade-off between the false positive rate and the size.
*
* <p>
* Originally inspired by
* <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
* Originally inspired by <a href="http://www.one-lab.org">European Commission
* One-Lab Project 034819</a>.
*
* Bloom filters are very sensitive to the number of elements inserted into
* them. For HBase, the number of entries depends on the size of the data stored
* in the column. Currently the default region size is 256MB, so entry count ~=
* 256MB / (average value size for column). Despite this rule of thumb, there is
* no efficient way to calculate the entry count after compactions. Therefore,
* it is often easier to use a dynamic bloom filter that will add extra space
* instead of allowing the error rate to grow.
*
* ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey
* .pdf )
*
* m denotes the number of bits in the Bloom filter (bitSize) n denotes the
* number of elements inserted into the Bloom filter (maxKeys) k represents the
* number of hash functions used (nbHash) e represents the desired false
* positive rate for the bloom (err)
*
* If we fix the error rate (e) and know the number of entries, then the optimal
* bloom size m = -(n * ln(err) / (ln(2)^2) ~= n * ln(err) / ln(0.6185)
*
* The probability of false positives is minimized when k = m/n ln(2).
*
* @see BloomFilter The general behavior of a filter
*
* @see <a href="http://portal.acm.org/citation.cfm?id=362692&dl=ACM&coll=portal">Space/Time Trade-Offs in Hash Coding with Allowable Errors</a>
* @see <a
* href="http://portal.acm.org/citation.cfm?id=362692&dl=ACM&coll=portal">
* Space/Time Trade-Offs in Hash Coding with Allowable Errors</a>
*/
public class ByteBloomFilter implements BloomFilter {
public class ByteBloomFilter implements BloomFilter, BloomFilterWriter {
/** Current file format version */
public static final int VERSION = 1;
/** Bytes (B) in the array */
/** Bytes (B) in the array. This actually has to fit into an int. */
protected long byteSize;
/** Number of hash functions */
protected final int hashCount;
protected int hashCount;
/** Hash type */
protected final int hashType;
/** Hash Function */
@ -66,6 +96,21 @@ public class ByteBloomFilter implements BloomFilter {
/** Bloom bits */
protected ByteBuffer bloom;
/** Record separator for the Bloom filter statistics human-readable string */
public static final String STATS_RECORD_SEP = "; ";
/**
* Used in computing the optimal Bloom filter size. This approximately equals
* 0.480453.
*/
public static final double LOG2_SQUARED = Math.log(2) * Math.log(2);
/**
* A random number generator to use for "fake lookups" when testing to
* estimate the ideal false positive rate.
*/
private static Random randomGeneratorForTest;
/** Bit-value lookup array to prevent doing the same work over and over */
private static final byte [] bitvals = {
(byte) 0x01,
@ -76,86 +121,209 @@ public class ByteBloomFilter implements BloomFilter {
(byte) 0x20,
(byte) 0x40,
(byte) 0x80
};
};
/**
* Loads bloom filter meta data from file input.
* @param meta stored bloom meta data
* @throws IllegalArgumentException meta data is invalid
*/
public ByteBloomFilter(ByteBuffer meta)
throws IllegalArgumentException {
int version = meta.getInt();
if (version != VERSION) throw new IllegalArgumentException("Bad version");
this.byteSize = meta.getInt();
this.hashCount = meta.getInt();
this.hashType = meta.getInt();
this.keyCount = meta.getInt();
public ByteBloomFilter(DataInput meta)
throws IOException, IllegalArgumentException {
this.byteSize = meta.readInt();
this.hashCount = meta.readInt();
this.hashType = meta.readInt();
this.keyCount = meta.readInt();
this.maxKeys = this.keyCount;
this.hash = Hash.getInstance(this.hashType);
if (hash == null) {
throw new IllegalArgumentException("Invalid hash type: " + hashType);
}
sanityCheck();
}
/**
* Determines & initializes bloom filter meta data from user config. Call
* @param maxKeys
* @param errorRate
* @return the number of bits for a Bloom filter than can hold the given
* number of keys and provide the given error rate, assuming that the
* optimal number of hash functions is used and it does not have to
* be an integer.
*/
public static long computeBitSize(long maxKeys, double errorRate) {
return (long) Math.ceil(maxKeys * (-Math.log(errorRate) / LOG2_SQUARED));
}
/**
* The maximum number of keys we can put into a Bloom filter of a certain
* size to maintain the given error rate, assuming the number of hash
* functions is chosen optimally and does not even have to be an integer
* (hence the "ideal" in the function name).
*
* @param bitSize
* @param errorRate
* @return maximum number of keys that can be inserted into the Bloom filter
* @see {@link #computeMaxKeys(long, double, int)} for a more precise
* estimate
*/
public static long idealMaxKeys(long bitSize, double errorRate) {
// The reason we need to use floor here is that otherwise we might put
// more keys in a Bloom filter than is allowed by the target error rate.
return (long) (bitSize * (LOG2_SQUARED / -Math.log(errorRate)));
}
/**
* The maximum number of keys we can put into a Bloom filter of a certain
* size to get the given error rate, with the given number of hash functions.
*
* @param bitSize
* @param errorRate
* @param hashCount
* @return the maximum number of keys that can be inserted in a Bloom filter
* to maintain the target error rate, if the number of hash functions
* is provided.
*/
public static long computeMaxKeys(long bitSize, double errorRate,
int hashCount) {
return (long) (-bitSize * 1.0 / hashCount *
Math.log(1 - Math.exp(Math.log(errorRate) / hashCount)));
}
/**
* Computes the error rate for this Bloom filter, taking into account the
* actual number of hash functions and keys inserted. The return value of
* this function changes as a Bloom filter is being populated. Used for
* reporting the actual error rate of compound Bloom filters when writing
* them out.
*
* @return error rate for this particular Bloom filter
*/
public double actualErrorRate() {
return actualErrorRate(keyCount, byteSize * 8, hashCount);
}
/**
* Computes the actual error rate for the given number of elements, number
* of bits, and number of hash functions. Taken directly from the
* <a href=
* "http://en.wikipedia.org/wiki/Bloom_filter#Probability_of_false_positives"
* > Wikipedia Bloom filter article</a>.
*
* @param maxKeys
* @param bitSize
* @param functionCount
* @return the actual error rate
*/
public static double actualErrorRate(long maxKeys, long bitSize,
int functionCount) {
return Math.exp(Math.log(1 - Math.exp(-functionCount * maxKeys * 1.0
/ bitSize)) * functionCount);
}
/**
* Increases the given byte size of a Bloom filter until it can be folded by
* the given factor.
*
* @param bitSize
* @param foldFactor
* @return
*/
public static int computeFoldableByteSize(long bitSize, int foldFactor) {
long byteSizeLong = (bitSize + 7) / 8;
int mask = (1 << foldFactor) - 1;
if ((mask & byteSizeLong) != 0) {
byteSizeLong >>= foldFactor;
++byteSizeLong;
byteSizeLong <<= foldFactor;
}
if (byteSizeLong > Integer.MAX_VALUE) {
throw new IllegalArgumentException("byteSize=" + byteSizeLong + " too "
+ "large for bitSize=" + bitSize + ", foldFactor=" + foldFactor);
}
return (int) byteSizeLong;
}
private static int optimalFunctionCount(int maxKeys, long bitSize) {
return (int) Math.ceil(Math.log(2) * (bitSize / maxKeys));
}
/** Private constructor used by other constructors. */
private ByteBloomFilter(int hashType) {
this.hashType = hashType;
this.hash = Hash.getInstance(hashType);
}
/**
* Determines & initializes bloom filter meta data from user config. Call
* {@link #allocBloom()} to allocate bloom filter data.
* @param maxKeys Maximum expected number of keys that will be stored in this bloom
* @param errorRate Desired false positive error rate. Lower rate = more storage required
*
* @param maxKeys Maximum expected number of keys that will be stored in this
* bloom
* @param errorRate Desired false positive error rate. Lower rate = more
* storage required
* @param hashType Type of hash function to use
* @param foldFactor When finished adding entries, you may be able to 'fold'
* this bloom to save space. Tradeoff potentially excess bytes in bloom for
* ability to fold if keyCount is exponentially greater than maxKeys.
* this bloom to save space. Tradeoff potentially excess bytes in
* bloom for ability to fold if keyCount is exponentially greater
* than maxKeys.
* @throws IllegalArgumentException
*/
public ByteBloomFilter(int maxKeys, float errorRate, int hashType, int foldFactor)
throws IllegalArgumentException {
/*
* Bloom filters are very sensitive to the number of elements inserted
* into them. For HBase, the number of entries depends on the size of the
* data stored in the column. Currently the default region size is 256MB,
* so entry count ~= 256MB / (average value size for column). Despite
* this rule of thumb, there is no efficient way to calculate the entry
* count after compactions. Therefore, it is often easier to use a
* dynamic bloom filter that will add extra space instead of allowing the
* error rate to grow.
*
* ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf )
*
* m denotes the number of bits in the Bloom filter (bitSize)
* n denotes the number of elements inserted into the Bloom filter (maxKeys)
* k represents the number of hash functions used (nbHash)
* e represents the desired false positive rate for the bloom (err)
*
* If we fix the error rate (e) and know the number of entries, then
* the optimal bloom size m = -(n * ln(err) / (ln(2)^2)
* ~= n * ln(err) / ln(0.6185)
*
* The probability of false positives is minimized when k = m/n ln(2).
*/
long bitSize = (long)Math.ceil(maxKeys * (Math.log(errorRate) / Math.log(0.6185)));
int functionCount = (int)Math.ceil(Math.log(2) * (bitSize / maxKeys));
public ByteBloomFilter(int maxKeys, double errorRate, int hashType,
int foldFactor) throws IllegalArgumentException {
this(hashType);
// increase byteSize so folding is possible
long byteSize = (bitSize + 7) / 8;
int mask = (1 << foldFactor) - 1;
if ( (mask & byteSize) != 0) {
byteSize >>= foldFactor;
++byteSize;
byteSize <<= foldFactor;
}
this.byteSize = byteSize;
this.hashCount = functionCount;
this.hashType = hashType;
this.keyCount = 0;
long bitSize = computeBitSize(maxKeys, errorRate);
hashCount = optimalFunctionCount(maxKeys, bitSize);
this.maxKeys = maxKeys;
this.hash = Hash.getInstance(hashType);
// increase byteSize so folding is possible
byteSize = computeFoldableByteSize(bitSize, foldFactor);
sanityCheck();
}
/**
* Creates a Bloom filter of the given size.
*
* @param byteSizeHint the desired number of bytes for the Bloom filter bit
* array. Will be increased so that folding is possible.
* @param errorRate target false positive rate of the Bloom filter
* @param hashType Bloom filter hash function type
* @param foldFactor
* @return the new Bloom filter of the desired size
*/
public static ByteBloomFilter createBySize(int byteSizeHint,
double errorRate, int hashType, int foldFactor) {
ByteBloomFilter bbf = new ByteBloomFilter(hashType);
bbf.byteSize = computeFoldableByteSize(byteSizeHint * 8, foldFactor);
long bitSize = bbf.byteSize * 8;
bbf.maxKeys = (int) idealMaxKeys(bitSize, errorRate);
bbf.hashCount = optimalFunctionCount(bbf.maxKeys, bitSize);
// Adjust max keys to bring error rate closer to what was requested,
// because byteSize was adjusted to allow for folding, and hashCount was
// rounded.
bbf.maxKeys = (int) computeMaxKeys(bitSize, errorRate, bbf.hashCount);
return bbf;
}
/**
* Creates another similar Bloom filter. Does not copy the actual bits, and
* sets the new filter's key count to zero.
*
* @return a Bloom filter with the same configuration as this
*/
public ByteBloomFilter createAnother() {
ByteBloomFilter bbf = new ByteBloomFilter(hashType);
bbf.byteSize = byteSize;
bbf.hashCount = hashCount;
bbf.maxKeys = maxKeys;
return bbf;
}
@Override
public void allocBloom() {
if (this.bloom != null) {
@ -190,7 +358,6 @@ public class ByteBloomFilter implements BloomFilter {
}
}
@Override
public void add(byte [] buf) {
add(buf, 0, buf.length);
}
@ -212,42 +379,64 @@ public class ByteBloomFilter implements BloomFilter {
++this.keyCount;
}
/**
* Should only be used in tests when writing a bloom filter.
*/
/** Should only be used in tests */
boolean contains(byte [] buf) {
return contains(buf, 0, buf.length, this.bloom);
}
/**
* Should only be used in tests when writing a bloom filter.
*/
/** Should only be used in tests */
boolean contains(byte [] buf, int offset, int length) {
return contains(buf, offset, length, this.bloom);
return contains(buf, offset, length, bloom);
}
/** Should only be used in tests */
boolean contains(byte[] buf, ByteBuffer bloom) {
return contains(buf, 0, buf.length, bloom);
}
@Override
public boolean contains(byte [] buf, ByteBuffer theBloom) {
return contains(buf, 0, buf.length, theBloom);
}
@Override
public boolean contains(byte [] buf, int offset, int length,
public boolean contains(byte[] buf, int offset, int length,
ByteBuffer theBloom) {
if(theBloom.limit() != this.byteSize) {
throw new IllegalArgumentException("Bloom does not match expected size");
if (theBloom == null) {
// In a version 1 HFile Bloom filter data is stored in a separate meta
// block which is loaded on demand, but in version 2 it is pre-loaded.
// We want to use the same API in both cases.
theBloom = bloom;
}
int hash1 = this.hash.hash(buf, offset, length, 0);
int hash2 = this.hash.hash(buf, offset, length, hash1);
if (theBloom.limit() != byteSize) {
throw new IllegalArgumentException("Bloom does not match expected size:"
+ " theBloom.limit()=" + theBloom.limit() + ", byteSize=" + byteSize);
}
for (int i = 0; i < this.hashCount; i++) {
long hashLoc = Math.abs((hash1 + i * hash2) % (this.byteSize * 8));
if (!get(hashLoc, theBloom) ) {
return false;
return contains(buf, offset, length, theBloom.array(),
theBloom.arrayOffset(), (int) byteSize, hash, hashCount);
}
public static boolean contains(byte[] buf, int offset, int length,
byte[] bloomArray, int bloomOffset, int bloomSize, Hash hash,
int hashCount) {
int hash1 = hash.hash(buf, offset, length, 0);
int hash2 = hash.hash(buf, offset, length, hash1);
int bloomBitSize = bloomSize * 8;
if (randomGeneratorForTest == null) {
// Production mode.
for (int i = 0; i < hashCount; i++) {
long hashLoc = Math.abs((hash1 + i * hash2) % bloomBitSize);
if (!get(hashLoc, bloomArray, bloomOffset))
return false;
}
} else {
// Test mode with "fake lookups" to estimate "ideal false positive rate".
for (int i = 0; i < hashCount; i++) {
long hashLoc = randomGeneratorForTest.nextInt(bloomBitSize);
if (!get(hashLoc, bloomArray, bloomOffset))
return false;
}
}
return true;
}
@ -273,27 +462,31 @@ public class ByteBloomFilter implements BloomFilter {
* @param pos index of bit
* @return true if bit at specified index is 1, false if 0.
*/
static boolean get(long pos, ByteBuffer theBloom) {
static boolean get(long pos, byte[] bloomArray, int bloomOffset) {
int bytePos = (int)(pos / 8);
int bitPos = (int)(pos % 8);
byte curByte = theBloom.get(bytePos);
byte curByte = bloomArray[bloomOffset + bytePos];
curByte &= bitvals[bitPos];
return (curByte != 0);
}
@Override
public int getKeyCount() {
return this.keyCount;
public long getKeyCount() {
return keyCount;
}
@Override
public int getMaxKeys() {
return this.maxKeys;
public long getMaxKeys() {
return maxKeys;
}
@Override
public int getByteSize() {
return (int)this.byteSize;
public long getByteSize() {
return byteSize;
}
public int getHashType() {
return hashType;
}
@Override
@ -367,7 +560,7 @@ public class ByteBloomFilter implements BloomFilter {
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(VERSION);
out.writeInt((int)byteSize);
out.writeInt((int) byteSize);
out.writeInt(hashCount);
out.writeInt(hashType);
out.writeInt(keyCount);
@ -387,4 +580,73 @@ public class ByteBloomFilter implements BloomFilter {
}
}
public int getHashCount() {
return hashCount;
}
@Override
public boolean supportsAutoLoading() {
return bloom != null;
}
public static void setFakeLookupMode(boolean enabled) {
if (enabled) {
randomGeneratorForTest = new Random(283742987L);
} else {
randomGeneratorForTest = null;
}
}
/**
* {@inheritDoc}
* Just concatenate row and column by default. May return the original row
* buffer if the column qualifier is empty.
*/
@Override
public byte[] createBloomKey(byte[] rowBuf, int rowOffset, int rowLen,
byte[] qualBuf, int qualOffset, int qualLen) {
// Optimize the frequent case when only the row is provided.
if (qualLen <= 0 && rowOffset == 0 && rowLen == rowBuf.length)
return rowBuf;
byte [] result = new byte[rowLen + qualLen];
System.arraycopy(rowBuf, rowOffset, result, 0, rowLen);
if (qualLen > 0)
System.arraycopy(qualBuf, qualOffset, result, rowLen, qualLen);
return result;
}
@Override
public RawComparator<byte[]> getComparator() {
return Bytes.BYTES_RAWCOMPARATOR;
}
/**
* A human-readable string with statistics for the given Bloom filter.
*
* @param bloomFilter the Bloom filter to output statistics for;
* @return a string consisting of "&lt;key&gt;: &lt;value&gt;" parts
* separated by {@link #STATS_RECORD_SEP}.
*/
public static String formatStats(BloomFilterBase bloomFilter) {
StringBuilder sb = new StringBuilder();
long k = bloomFilter.getKeyCount();
long m = bloomFilter.getMaxKeys();
sb.append("BloomSize: " + bloomFilter.getByteSize() + STATS_RECORD_SEP);
sb.append("No of Keys in bloom: " + k + STATS_RECORD_SEP);
sb.append("Max Keys for bloom: " + m);
if (m > 0) {
sb.append(STATS_RECORD_SEP + "Percentage filled: "
+ NumberFormat.getPercentInstance().format(k * 1.0 / m));
}
return sb.toString();
}
@Override
public String toString() {
return formatStats(this) + STATS_RECORD_SEP + "Actual error rate: "
+ String.format("%.8f", actualErrorRate());
}
}

View File

@ -308,18 +308,24 @@ public class Bytes {
* @see #toStringBinary(byte[], int, int)
*/
public static String toStringBinary(final byte [] b) {
return toStringBinary(b, 0, b.length);
}
/**
* The same as {@link #toStringBinary(byte[])}, but returns a string "null"
* if given a null argument.
*/
public static String toStringBinarySafe(final byte [] b) {
if (b == null)
return "null";
return toStringBinary(b, 0, b.length);
}
/**
* Converts the given byte buffer, from its array offset to its limit, to
* a string. The position and the mark are ignored.
*
* @param buf a byte buffer
* @return a string representation of the buffer's binary contents
*/
public static String toStringBinary(ByteBuffer buf) {
if (buf == null)
return "null";
return toStringBinary(buf.array(), buf.arrayOffset(), buf.limit());
}
/**
* Write a printable representation of a byte array. Non-printable
* characters are hex escaped in the format \\x%02X, eg:
@ -1470,12 +1476,18 @@ public class Bytes {
/**
* Binary search for keys in indexes.
*
* @param arr array of byte arrays to search for
* @param key the key you want to find
* @param offset the offset in the key you want to find
* @param length the length of the key
* @param comparator a comparator to compare.
* @return index of key
* @return zero-based index of the key, if the key is present in the array.
* Otherwise, a value -(i + 1) such that the key is between arr[i -
* 1] and arr[i] non-inclusively, where i is in [0, i], if we define
* arr[-1] = -Inf and arr[N] = Inf for an N-element array. The above
* means that this function can return 2N + 1 different values
* ranging from -(N + 1) to N - 1.
*/
public static int binarySearch(byte [][]arr, byte []key, int offset,
int length, RawComparator<byte []> comparator) {
@ -1584,4 +1596,34 @@ public class Bytes {
return value;
}
/**
* Writes a string as a fixed-size field, padded with zeros.
*/
public static void writeStringFixedSize(final DataOutput out, String s,
int size) throws IOException {
byte[] b = toBytes(s);
if (b.length > size) {
throw new IOException("Trying to write " + b.length + " bytes (" +
toStringBinary(b) + ") into a field of length " + size);
}
out.writeBytes(s);
for (int i = 0; i < size - s.length(); ++i)
out.writeByte(0);
}
/**
* Reads a fixed-size field and interprets it as a string padded with zeros.
*/
public static String readStringFixedSize(final DataInput in, int size)
throws IOException {
byte[] b = new byte[size];
in.readFully(b);
int n = b.length;
while (n > 0 && b[n - 1] == 0)
--n;
return toString(b, 0, n);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -80,6 +81,7 @@ public class CompressionTest {
}
}
Configuration conf = HBaseConfiguration.create();
try {
Compressor c = algo.getCompressor();
algo.returnCompressor(c);
@ -103,13 +105,14 @@ public class CompressionTest {
public static void doSmokeTest(FileSystem fs, Path path, String codec)
throws Exception {
HFile.Writer writer = new HFile.Writer(
Configuration conf = HBaseConfiguration.create();
HFile.Writer writer = HFile.getWriterFactory(conf).createWriter(
fs, path, HFile.DEFAULT_BLOCKSIZE, codec, null);
writer.append(Bytes.toBytes("testkey"), Bytes.toBytes("testval"));
writer.appendFileInfo(Bytes.toBytes("infokey"), Bytes.toBytes("infoval"));
writer.close();
HFile.Reader reader = new HFile.Reader(fs, path, null, false, false);
HFile.Reader reader = HFile.createReader(fs, path, null, false, false);
reader.loadFileInfo();
byte[] key = reader.getFirstKey();
boolean rc = Bytes.toString(key).equals("testkey");

View File

@ -122,8 +122,8 @@ public abstract class Hash {
}
/**
* Calculate a hash using bytes from 0 to <code>length</code>, and
* the provided seed value
* Calculate a hash using bytes from <code>offset</code> to <code>offset +
* length</code>, and the provided seed value.
* @param bytes input bytes
* @param offset the offset into the array to start consideration
* @param length length of the valid bytes after offset to consider

View File

@ -487,8 +487,8 @@ public class RecoverableZooKeeper {
return path;
}
LOG.error("Node " + path + " already exists with " +
Bytes.toStringBinarySafe(currentData) + ", could not write " +
Bytes.toStringBinarySafe(data));
Bytes.toStringBinary(currentData) + ", could not write " +
Bytes.toStringBinary(data));
throw e;
}
LOG.error("Node " + path + " already exists and this is not a " +

View File

@ -446,6 +446,57 @@
Used by bloom filters.
</description>
</property>
<property>
<name>hfile.block.index.cacheonwrite</name>
<value>false</value>
<description>
This allows to put non-root multi-level index blocks into the block
cache at the time the index is being written.
</description>
</property>
<property>
<name>hfile.index.block.max.size</name>
<value>131072</value>
<description>
When the size of a leaf-level, intermediate-level, or root-level
index block in a multi-level block index grows to this size, the
block is written out and a new block is started.
</description>
</property>
<property>
<name>hfile.format.version</name>
<value>2</value>
<description>
The HFile format version to use for new files. Set this to 1 to test
backwards-compatibility. The default value of this option should be
consistent with FixedFileTrailer.MAX_VERSION.
</description>
</property>
<property>
<name>io.storefile.bloom.block.size</name>
<value>131072</value>
<description>
The size in bytes of a single block ("chunk") of a compound Bloom
filter. This size is approximate, because Bloom blocks can only be
inserted at data block boundaries, and the number of keys per data
block varies.
</description>
</property>
<property>
<name>io.storefile.bloom.cacheonwrite</name>
<value>false</value>
<description>
Enables cache-on-write for inline blocks of a compound Bloom filter.
</description>
</property>
<property>
<name>hbase.rs.cacheblocksonwrite</name>
<value>false</value>
<description>
Whether an HFile block should be added to the block cache when the
block is finished.
</description>
</property>
<property>
<name>hbase.rpc.engine</name>
<value>org.apache.hadoop.hbase.ipc.WritableRpcEngine</value>

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.migration.HRegionInfo090x;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -108,6 +109,19 @@ public class HBaseTestingUtility {
*/
public static final String DEFAULT_TEST_DIRECTORY = "target/test-data";
/** Compression algorithms to use in parameterized JUnit 4 tests */
public static final List<Object[]> COMPRESSION_ALGORITHMS_PARAMETERIZED =
Arrays.asList(new Object[][] {
{ Compression.Algorithm.NONE },
{ Compression.Algorithm.GZ }
});
/** Compression algorithms to use in testing */
public static final Compression.Algorithm[] COMPRESSION_ALGORITHMS =
new Compression.Algorithm[] {
Compression.Algorithm.NONE, Compression.Algorithm.GZ
};
public HBaseTestingUtility() {
this(HBaseConfiguration.create());
}
@ -135,6 +149,18 @@ public class HBaseTestingUtility {
return this.conf;
}
/**
* Makes sure the test directory is set up so that {@link #getTestDir()}
* returns a valid directory. Useful in unit tests that do not run a
* mini-cluster.
*/
public void initTestDir() {
if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
clusterTestBuildDir = setupClusterTestBuildDir();
System.setProperty(TEST_DIRECTORY_KEY, clusterTestBuildDir.getPath());
}
}
/**
* @return Where to write test data on local filesystem; usually
* {@link #DEFAULT_TEST_DIRECTORY}

View File

@ -188,8 +188,9 @@ public class HFilePerformanceEvaluation {
@Override
void setUp() throws Exception {
writer = new HFile.Writer(this.fs, this.mf, RFILE_BLOCKSIZE,
(Compression.Algorithm) null, null, null);
writer = HFile.getWriterFactory(conf).createWriter(this.fs, this.mf,
RFILE_BLOCKSIZE,
(Compression.Algorithm) null, null);
}
@Override
@ -225,7 +226,7 @@ public class HFilePerformanceEvaluation {
@Override
void setUp() throws Exception {
reader = new HFile.Reader(this.fs, this.mf, null, false, false);
reader = HFile.createReader(this.fs, this.mf, null, false, false);
this.reader.loadFileInfo();
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.io;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -60,9 +61,11 @@ public class TestHalfStoreFileReader {
String root_dir = HBaseTestingUtility.getTestDir("TestHalfStoreFile").toString();
Path p = new Path(root_dir, "test");
FileSystem fs = FileSystem.get(test_util.getConfiguration());
Configuration conf = test_util.getConfiguration();
FileSystem fs = FileSystem.get(conf);
HFile.Writer w = new HFile.Writer(fs, p, 1024, "none", KeyValue.KEY_COMPARATOR);
HFile.Writer w = HFile.getWriterFactory(conf).createWriter(fs, p, 1024,
"none", KeyValue.KEY_COMPARATOR);
// write some things.
List<KeyValue> items = genSomeKeys();
@ -71,7 +74,7 @@ public class TestHalfStoreFileReader {
}
w.close();
HFile.Reader r = new HFile.Reader(fs, p, null, false, false);
HFile.Reader r = HFile.createReader(fs, p, null, false, false);
r.loadFileInfo();
byte [] midkey = r.midkey();
KeyValue midKV = KeyValue.createKeyValueFromKey(midkey);

View File

@ -67,9 +67,9 @@ public class RandomSeek {
Path path = new Path("/Users/ryan/rfile.big.txt");
long start = System.currentTimeMillis();
SimpleBlockCache cache = new SimpleBlockCache();
Reader reader = new HFile.Reader(lfs, path, cache, false, false);
Reader reader = HFile.createReader(lfs, path, cache, false, false);
reader.loadFileInfo();
System.out.println(reader.trailer);
System.out.println(reader.getTrailer());
long end = System.currentTimeMillis();
System.out.println("Index read time: " + (end - start));

View File

@ -20,8 +20,11 @@
package org.apache.hadoop.hbase.io.hfile;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.HeapSize;
import java.util.LinkedList;
import junit.framework.TestCase;
public class TestCachedBlockQueue extends TestCase {
@ -127,9 +130,13 @@ public class TestCachedBlockQueue extends TestCase {
private static class CachedBlock extends org.apache.hadoop.hbase.io.hfile.CachedBlock
{
public CachedBlock(long heapSize, String name, long accessTime) {
public CachedBlock(final long heapSize, String name, long accessTime) {
super(name,
ByteBuffer.allocate((int)(heapSize - CachedBlock.PER_BLOCK_OVERHEAD)),
new HeapSize(){
@Override
public long heapSize() {
return ((int)(heapSize - CachedBlock.PER_BLOCK_OVERHEAD));
}},
accessTime,false);
}
}

View File

@ -33,11 +33,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.io.hfile.HFile.BlockIndex;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.Writable;
/**
@ -63,9 +61,9 @@ public class TestHFile extends HBaseTestCase {
*/
public void testEmptyHFile() throws IOException {
Path f = new Path(ROOT_DIR, getName());
Writer w = new Writer(this.fs, f);
Writer w = HFile.getWriterFactory(conf).createWriter(this.fs, f);
w.close();
Reader r = new Reader(fs, f, null, false, false);
Reader r = HFile.createReader(fs, f, null, false, false);
r.loadFileInfo();
assertNull(r.getFirstKey());
assertNull(r.getLastKey());
@ -134,13 +132,13 @@ public class TestHFile extends HBaseTestCase {
void basicWithSomeCodec(String codec) throws IOException {
Path ncTFile = new Path(ROOT_DIR, "basic.hfile");
FSDataOutputStream fout = createFSOutput(ncTFile);
Writer writer = new Writer(fout, minBlockSize,
Compression.getCompressionAlgorithmByName(codec), null);
Writer writer = HFile.getWriterFactory(conf).createWriter(fout,
minBlockSize, Compression.getCompressionAlgorithmByName(codec), null);
LOG.info(writer);
writeRecords(writer);
fout.close();
FSDataInputStream fin = fs.open(ncTFile);
Reader reader = new Reader(ncTFile, fs.open(ncTFile),
Reader reader = HFile.createReader(ncTFile, fs.open(ncTFile),
fs.getFileStatus(ncTFile).getLen(), null, false, false);
// Load up the index.
reader.loadFileInfo();
@ -209,13 +207,14 @@ public class TestHFile extends HBaseTestCase {
private void metablocks(final String compress) throws Exception {
Path mFile = new Path(ROOT_DIR, "meta.hfile");
FSDataOutputStream fout = createFSOutput(mFile);
Writer writer = new Writer(fout, minBlockSize,
Compression.getCompressionAlgorithmByName(compress), null);
Writer writer = HFile.getWriterFactory(conf).createWriter(fout,
minBlockSize, Compression.getCompressionAlgorithmByName(compress),
null);
someTestingWithMetaBlock(writer);
writer.close();
fout.close();
FSDataInputStream fin = fs.open(mFile);
Reader reader = new Reader(mFile, fs.open(mFile),
Reader reader = HFile.createReader(mFile, fs.open(mFile),
this.fs.getFileStatus(mFile).getLen(), null, false, false);
reader.loadFileInfo();
// No data -- this should return false.
@ -233,33 +232,35 @@ public class TestHFile extends HBaseTestCase {
}
public void testNullMetaBlocks() throws Exception {
Path mFile = new Path(ROOT_DIR, "nometa.hfile");
FSDataOutputStream fout = createFSOutput(mFile);
Writer writer = new Writer(fout, minBlockSize,
Compression.Algorithm.NONE, null);
writer.append("foo".getBytes(), "value".getBytes());
writer.close();
fout.close();
Reader reader = new Reader(fs, mFile, null, false, false);
reader.loadFileInfo();
assertNull(reader.getMetaBlock("non-existant", false));
for (Compression.Algorithm compressAlgo :
HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
Path mFile = new Path(ROOT_DIR, "nometa_" + compressAlgo + ".hfile");
FSDataOutputStream fout = createFSOutput(mFile);
Writer writer = HFile.getWriterFactory(conf).createWriter(fout,
minBlockSize, compressAlgo, null);
writer.append("foo".getBytes(), "value".getBytes());
writer.close();
fout.close();
Reader reader = HFile.createReader(fs, mFile, null, false, false);
reader.loadFileInfo();
assertNull(reader.getMetaBlock("non-existant", false));
}
}
/**
* Make sure the orginals for our compression libs doesn't change on us.
*/
public void testCompressionOrdinance() {
//assertTrue(Compression.Algorithm.LZO.ordinal() == 0);
assertTrue(Compression.Algorithm.LZO.ordinal() == 0);
assertTrue(Compression.Algorithm.GZ.ordinal() == 1);
assertTrue(Compression.Algorithm.NONE.ordinal() == 2);
}
public void testComparator() throws IOException {
Path mFile = new Path(ROOT_DIR, "meta.tfile");
FSDataOutputStream fout = createFSOutput(mFile);
Writer writer = new Writer(fout, minBlockSize, (Compression.Algorithm) null,
new KeyComparator() {
Writer writer = HFile.getWriterFactory(conf).createWriter(fout,
minBlockSize, (Compression.Algorithm) null, new KeyComparator() {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
int l2) {
@ -277,27 +278,4 @@ public class TestHFile extends HBaseTestCase {
writer.close();
}
/**
* Checks if the HeapSize calculator is within reason
*/
@SuppressWarnings("unchecked")
public void testHeapSizeForBlockIndex() throws IOException{
Class cl = null;
long expected = 0L;
long actual = 0L;
cl = BlockIndex.class;
expected = ClassSize.estimateBase(cl, false);
BlockIndex bi = new BlockIndex(Bytes.BYTES_RAWCOMPARATOR);
actual = bi.heapSize();
//Since the arrays in BlockIndex(byte [][] blockKeys, long [] blockOffsets,
//int [] blockDataSizes) are all null they are not going to show up in the
//HeapSize calculation, so need to remove those array costs from ecpected.
expected -= ClassSize.align(3 * ClassSize.ARRAY);
if(expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
}
}
}

View File

@ -157,8 +157,8 @@ public class TestHFilePerformance extends TestCase {
if ("HFile".equals(fileType)){
System.out.println("HFile write method: ");
HFile.Writer writer =
new HFile.Writer(fout, minBlockSize, codecName, null);
HFile.Writer writer = HFile.getWriterFactory(conf).createWriter(fout,
minBlockSize, codecName, null);
// Writing value in one shot.
for (long l=0 ; l<rows ; l++ ) {
@ -236,7 +236,7 @@ public class TestHFilePerformance extends TestCase {
FSDataInputStream fin = fs.open(path);
if ("HFile".equals(fileType)){
HFile.Reader reader = new HFile.Reader(path, fs.open(path),
HFile.Reader reader = HFile.createReader(path, fs.open(path),
fs.getFileStatus(path).getLen(), null, false, false);
reader.loadFileInfo();
switch (method) {

View File

@ -118,8 +118,8 @@ public class TestHFileSeek extends TestCase {
long totalBytes = 0;
FSDataOutputStream fout = createFSOutput(path, fs);
try {
Writer writer =
new Writer(fout, options.minBlockSize, options.compress, null);
Writer writer = HFile.getWriterFactory(conf).createWriter(fout,
options.minBlockSize, options.compress, null);
try {
BytesWritable key = new BytesWritable();
BytesWritable val = new BytesWritable();
@ -163,8 +163,8 @@ public class TestHFileSeek extends TestCase {
int miss = 0;
long totalBytes = 0;
FSDataInputStream fsdis = fs.open(path);
Reader reader = new Reader(path, fsdis, fs.getFileStatus(path).getLen(),
null, false, false);
Reader reader = HFile.createReader(path, fsdis,
fs.getFileStatus(path).getLen(), null, false, false);
reader.loadFileInfo();
KeySampler kSampler =
new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(),

View File

@ -19,7 +19,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.hadoop.hbase.io.HeapSize;
@ -43,11 +42,11 @@ public class TestLruBlockCache extends TestCase {
LruBlockCache cache = new LruBlockCache(maxSize,blockSize);
Block [] blocks = generateFixedBlocks(10, blockSize, "block");
CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block");
// Add all the blocks
for(Block block : blocks) {
cache.cacheBlock(block.blockName, block.buf);
for (CachedItem block : blocks) {
cache.cacheBlock(block.blockName, block);
}
// Let the eviction run
@ -70,35 +69,35 @@ public class TestLruBlockCache extends TestCase {
LruBlockCache cache = new LruBlockCache(maxSize, blockSize);
Block [] blocks = generateRandomBlocks(100, blockSize);
CachedItem [] blocks = generateRandomBlocks(100, blockSize);
long expectedCacheSize = cache.heapSize();
// Confirm empty
for(Block block : blocks) {
for (CachedItem block : blocks) {
assertTrue(cache.getBlock(block.blockName, true) == null);
}
// Add blocks
for(Block block : blocks) {
cache.cacheBlock(block.blockName, block.buf);
expectedCacheSize += block.heapSize();
for (CachedItem block : blocks) {
cache.cacheBlock(block.blockName, block);
expectedCacheSize += block.cacheBlockHeapSize();
}
// Verify correctly calculated cache heap size
assertEquals(expectedCacheSize, cache.heapSize());
// Check if all blocks are properly cached and retrieved
for(Block block : blocks) {
ByteBuffer buf = cache.getBlock(block.blockName, true);
for (CachedItem block : blocks) {
HeapSize buf = cache.getBlock(block.blockName, true);
assertTrue(buf != null);
assertEquals(buf.capacity(), block.buf.capacity());
assertEquals(buf.heapSize(), block.heapSize());
}
// Re-add same blocks and ensure nothing has changed
for(Block block : blocks) {
for (CachedItem block : blocks) {
try {
cache.cacheBlock(block.blockName, block.buf);
cache.cacheBlock(block.blockName, block);
assertTrue("Cache should not allow re-caching a block", false);
} catch(RuntimeException re) {
// expected
@ -109,10 +108,10 @@ public class TestLruBlockCache extends TestCase {
assertEquals(expectedCacheSize, cache.heapSize());
// Check if all blocks are properly cached and retrieved
for(Block block : blocks) {
ByteBuffer buf = cache.getBlock(block.blockName, true);
for (CachedItem block : blocks) {
HeapSize buf = cache.getBlock(block.blockName, true);
assertTrue(buf != null);
assertEquals(buf.capacity(), block.buf.capacity());
assertEquals(buf.heapSize(), block.heapSize());
}
// Expect no evictions
@ -129,14 +128,14 @@ public class TestLruBlockCache extends TestCase {
LruBlockCache cache = new LruBlockCache(maxSize,blockSize,false);
Block [] blocks = generateFixedBlocks(10, blockSize, "block");
CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block");
long expectedCacheSize = cache.heapSize();
// Add all the blocks
for(Block block : blocks) {
cache.cacheBlock(block.blockName, block.buf);
expectedCacheSize += block.heapSize();
for (CachedItem block : blocks) {
cache.cacheBlock(block.blockName, block);
expectedCacheSize += block.cacheBlockHeapSize();
}
// A single eviction run should have occurred
@ -158,7 +157,7 @@ public class TestLruBlockCache extends TestCase {
assertTrue(cache.getBlock(blocks[1].blockName, true) == null);
for(int i=2;i<blocks.length;i++) {
assertEquals(cache.getBlock(blocks[i].blockName, true),
blocks[i].buf);
blocks[i]);
}
}
@ -169,21 +168,21 @@ public class TestLruBlockCache extends TestCase {
LruBlockCache cache = new LruBlockCache(maxSize,blockSize,false);
Block [] singleBlocks = generateFixedBlocks(5, 10000, "single");
Block [] multiBlocks = generateFixedBlocks(5, 10000, "multi");
CachedItem [] singleBlocks = generateFixedBlocks(5, 10000, "single");
CachedItem [] multiBlocks = generateFixedBlocks(5, 10000, "multi");
long expectedCacheSize = cache.heapSize();
// Add and get the multi blocks
for(Block block : multiBlocks) {
cache.cacheBlock(block.blockName, block.buf);
expectedCacheSize += block.heapSize();
assertEquals(cache.getBlock(block.blockName, true), block.buf);
for (CachedItem block : multiBlocks) {
cache.cacheBlock(block.blockName, block);
expectedCacheSize += block.cacheBlockHeapSize();
assertEquals(cache.getBlock(block.blockName, true), block);
}
// Add the single blocks (no get)
for(Block block : singleBlocks) {
cache.cacheBlock(block.blockName, block.buf);
for (CachedItem block : singleBlocks) {
cache.cacheBlock(block.blockName, block);
expectedCacheSize += block.heapSize();
}
@ -214,9 +213,9 @@ public class TestLruBlockCache extends TestCase {
// And all others to be cached
for(int i=1;i<4;i++) {
assertEquals(cache.getBlock(singleBlocks[i].blockName, true),
singleBlocks[i].buf);
singleBlocks[i]);
assertEquals(cache.getBlock(multiBlocks[i].blockName, true),
multiBlocks[i].buf);
multiBlocks[i]);
}
}
@ -236,9 +235,9 @@ public class TestLruBlockCache extends TestCase {
0.34f);// memory
Block [] singleBlocks = generateFixedBlocks(5, blockSize, "single");
Block [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
Block [] memoryBlocks = generateFixedBlocks(5, blockSize, "memory");
CachedItem [] singleBlocks = generateFixedBlocks(5, blockSize, "single");
CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
CachedItem [] memoryBlocks = generateFixedBlocks(5, blockSize, "memory");
long expectedCacheSize = cache.heapSize();
@ -246,17 +245,17 @@ public class TestLruBlockCache extends TestCase {
for(int i=0;i<3;i++) {
// Just add single blocks
cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
expectedCacheSize += singleBlocks[i].heapSize();
cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
expectedCacheSize += singleBlocks[i].cacheBlockHeapSize();
// Add and get multi blocks
cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i].buf);
expectedCacheSize += multiBlocks[i].heapSize();
cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i]);
expectedCacheSize += multiBlocks[i].cacheBlockHeapSize();
cache.getBlock(multiBlocks[i].blockName, true);
// Add memory blocks as such
cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i].buf, true);
expectedCacheSize += memoryBlocks[i].heapSize();
cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i], true);
expectedCacheSize += memoryBlocks[i].cacheBlockHeapSize();
}
@ -267,7 +266,7 @@ public class TestLruBlockCache extends TestCase {
assertEquals(expectedCacheSize, cache.heapSize());
// Insert a single block, oldest single should be evicted
cache.cacheBlock(singleBlocks[3].blockName, singleBlocks[3].buf);
cache.cacheBlock(singleBlocks[3].blockName, singleBlocks[3]);
// Single eviction, one thing evicted
assertEquals(1, cache.getEvictionCount());
@ -280,7 +279,7 @@ public class TestLruBlockCache extends TestCase {
cache.getBlock(singleBlocks[1].blockName, true);
// Insert another single block
cache.cacheBlock(singleBlocks[4].blockName, singleBlocks[4].buf);
cache.cacheBlock(singleBlocks[4].blockName, singleBlocks[4]);
// Two evictions, two evicted.
assertEquals(2, cache.getEvictionCount());
@ -290,7 +289,7 @@ public class TestLruBlockCache extends TestCase {
assertEquals(null, cache.getBlock(multiBlocks[0].blockName, true));
// Insert another memory block
cache.cacheBlock(memoryBlocks[3].blockName, memoryBlocks[3].buf, true);
cache.cacheBlock(memoryBlocks[3].blockName, memoryBlocks[3], true);
// Three evictions, three evicted.
assertEquals(3, cache.getEvictionCount());
@ -300,8 +299,8 @@ public class TestLruBlockCache extends TestCase {
assertEquals(null, cache.getBlock(memoryBlocks[0].blockName, true));
// Add a block that is twice as big (should force two evictions)
Block [] bigBlocks = generateFixedBlocks(3, blockSize*3, "big");
cache.cacheBlock(bigBlocks[0].blockName, bigBlocks[0].buf);
CachedItem [] bigBlocks = generateFixedBlocks(3, blockSize*3, "big");
cache.cacheBlock(bigBlocks[0].blockName, bigBlocks[0]);
// Four evictions, six evicted (inserted block 3X size, expect +3 evicted)
assertEquals(4, cache.getEvictionCount());
@ -316,7 +315,7 @@ public class TestLruBlockCache extends TestCase {
cache.getBlock(bigBlocks[0].blockName, true);
// Cache another single big block
cache.cacheBlock(bigBlocks[1].blockName, bigBlocks[1].buf);
cache.cacheBlock(bigBlocks[1].blockName, bigBlocks[1]);
// Five evictions, nine evicted (3 new)
assertEquals(5, cache.getEvictionCount());
@ -328,7 +327,7 @@ public class TestLruBlockCache extends TestCase {
assertEquals(null, cache.getBlock(multiBlocks[2].blockName, true));
// Cache a big memory block
cache.cacheBlock(bigBlocks[2].blockName, bigBlocks[2].buf, true);
cache.cacheBlock(bigBlocks[2].blockName, bigBlocks[2], true);
// Six evictions, twelve evicted (3 new)
assertEquals(6, cache.getEvictionCount());
@ -358,18 +357,18 @@ public class TestLruBlockCache extends TestCase {
0.33f, // multi
0.34f);// memory
Block [] singleBlocks = generateFixedBlocks(20, blockSize, "single");
Block [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
CachedItem [] singleBlocks = generateFixedBlocks(20, blockSize, "single");
CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
// Add 5 multi blocks
for(Block block : multiBlocks) {
cache.cacheBlock(block.blockName, block.buf);
for (CachedItem block : multiBlocks) {
cache.cacheBlock(block.blockName, block);
cache.getBlock(block.blockName, true);
}
// Add 5 single blocks
for(int i=0;i<5;i++) {
cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
}
// An eviction ran
@ -392,7 +391,7 @@ public class TestLruBlockCache extends TestCase {
// 12 more evicted.
for(int i=5;i<18;i++) {
cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
}
// 4 total evictions, 16 total evicted
@ -420,22 +419,22 @@ public class TestLruBlockCache extends TestCase {
0.33f, // multi
0.34f);// memory
Block [] singleBlocks = generateFixedBlocks(10, blockSize, "single");
Block [] multiBlocks = generateFixedBlocks(10, blockSize, "multi");
Block [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory");
CachedItem [] singleBlocks = generateFixedBlocks(10, blockSize, "single");
CachedItem [] multiBlocks = generateFixedBlocks(10, blockSize, "multi");
CachedItem [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory");
// Add all blocks from all priorities
for(int i=0;i<10;i++) {
// Just add single blocks
cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf);
cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i]);
// Add and get multi blocks
cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i].buf);
cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i]);
cache.getBlock(multiBlocks[i].blockName, true);
// Add memory blocks as such
cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i].buf, true);
cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i], true);
}
// Do not expect any evictions yet
@ -459,29 +458,29 @@ public class TestLruBlockCache extends TestCase {
// And the newest 5 blocks should still be accessible
for(int i=5;i<10;i++) {
assertEquals(singleBlocks[i].buf, cache.getBlock(singleBlocks[i].blockName, true));
assertEquals(multiBlocks[i].buf, cache.getBlock(multiBlocks[i].blockName, true));
assertEquals(memoryBlocks[i].buf, cache.getBlock(memoryBlocks[i].blockName, true));
assertEquals(singleBlocks[i], cache.getBlock(singleBlocks[i].blockName, true));
assertEquals(multiBlocks[i], cache.getBlock(multiBlocks[i].blockName, true));
assertEquals(memoryBlocks[i], cache.getBlock(memoryBlocks[i].blockName, true));
}
}
private Block [] generateFixedBlocks(int numBlocks, int size, String pfx) {
Block [] blocks = new Block[numBlocks];
private CachedItem [] generateFixedBlocks(int numBlocks, int size, String pfx) {
CachedItem [] blocks = new CachedItem[numBlocks];
for(int i=0;i<numBlocks;i++) {
blocks[i] = new Block(pfx + i, size);
blocks[i] = new CachedItem(pfx + i, size);
}
return blocks;
}
private Block [] generateFixedBlocks(int numBlocks, long size, String pfx) {
private CachedItem [] generateFixedBlocks(int numBlocks, long size, String pfx) {
return generateFixedBlocks(numBlocks, (int)size, pfx);
}
private Block [] generateRandomBlocks(int numBlocks, long maxSize) {
Block [] blocks = new Block[numBlocks];
private CachedItem [] generateRandomBlocks(int numBlocks, long maxSize) {
CachedItem [] blocks = new CachedItem[numBlocks];
Random r = new Random();
for(int i=0;i<numBlocks;i++) {
blocks[i] = new Block("block" + i, r.nextInt((int)maxSize)+1);
blocks[i] = new CachedItem("block" + i, r.nextInt((int)maxSize)+1);
}
return blocks;
}
@ -511,19 +510,26 @@ public class TestLruBlockCache extends TestCase {
LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
}
private static class Block implements HeapSize {
private static class CachedItem implements HeapSize {
String blockName;
ByteBuffer buf;
int size;
Block(String blockName, int size) {
CachedItem(String blockName, int size) {
this.blockName = blockName;
this.buf = ByteBuffer.allocate(size);
this.size = size;
}
/** The size of this item reported to the block cache layer */
@Override
public long heapSize() {
return CachedBlock.PER_BLOCK_OVERHEAD +
ClassSize.align(blockName.length()) +
ClassSize.align(buf.capacity());
return ClassSize.align(size);
}
/** Size of the cache block holding this item. Used for verification. */
public long cacheBlockHeapSize() {
return CachedBlock.PER_BLOCK_OVERHEAD
+ ClassSize.align(blockName.length())
+ ClassSize.align(size);
}
}
}

View File

@ -42,7 +42,8 @@ public class TestReseekTo {
Path ncTFile = new Path(HBaseTestingUtility.getTestDir(), "basic.hfile");
FSDataOutputStream fout = TEST_UTIL.getTestFileSystem().create(ncTFile);
HFile.Writer writer = new HFile.Writer(fout, 4000, "none", null);
HFile.Writer writer = HFile.getWriterFactory(
TEST_UTIL.getConfiguration()).createWriter(fout, 4000, "none", null);
int numberOfKeys = 1000;
String valueString = "Value";
@ -59,7 +60,7 @@ public class TestReseekTo {
writer.close();
fout.close();
HFile.Reader reader = new HFile.Reader(TEST_UTIL.getTestFileSystem(),
HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(),
ncTFile, null, false, false);
reader.loadFileInfo();
HFileScanner scanner = reader.getScanner(false, true);

View File

@ -45,7 +45,8 @@ public class TestSeekTo extends HBaseTestCase {
Path ncTFile = new Path(this.testDir, "basic.hfile");
FSDataOutputStream fout = this.fs.create(ncTFile);
int blocksize = toKV("a").getLength() * 3;
HFile.Writer writer = new HFile.Writer(fout, blocksize, "none", null);
HFile.Writer writer = HFile.getWriterFactory(conf).createWriter(fout,
blocksize, "none", null);
// 4 bytes * 3 * 2 for each key/value +
// 3 for keys, 15 for values = 42 (woot)
writer.append(toKV("c"));
@ -58,9 +59,10 @@ public class TestSeekTo extends HBaseTestCase {
fout.close();
return ncTFile;
}
public void testSeekBefore() throws Exception {
Path p = makeNewFile();
HFile.Reader reader = new HFile.Reader(fs, p, null, false, false);
HFile.Reader reader = HFile.createReader(fs, p, null, false, false);
reader.loadFileInfo();
HFileScanner scanner = reader.getScanner(false, true);
assertEquals(false, scanner.seekBefore(toKV("a").getKey()));
@ -93,9 +95,9 @@ public class TestSeekTo extends HBaseTestCase {
public void testSeekTo() throws Exception {
Path p = makeNewFile();
HFile.Reader reader = new HFile.Reader(fs, p, null, false, false);
HFile.Reader reader = HFile.createReader(fs, p, null, false, false);
reader.loadFileInfo();
assertEquals(2, reader.blockIndex.count);
assertEquals(2, reader.getDataBlockIndexReader().getRootBlockCount());
HFileScanner scanner = reader.getScanner(false, true);
// lies before the start of the file.
assertEquals(-1, scanner.seekTo(toKV("a").getKey()));
@ -113,30 +115,32 @@ public class TestSeekTo extends HBaseTestCase {
public void testBlockContainingKey() throws Exception {
Path p = makeNewFile();
HFile.Reader reader = new HFile.Reader(fs, p, null, false, false);
HFile.Reader reader = HFile.createReader(fs, p, null, false, false);
reader.loadFileInfo();
System.out.println(reader.blockIndex.toString());
HFileBlockIndex.BlockIndexReader blockIndexReader =
reader.getDataBlockIndexReader();
System.out.println(blockIndexReader.toString());
int klen = toKV("a").getKey().length;
// falls before the start of the file.
assertEquals(-1, reader.blockIndex.blockContainingKey(toKV("a").getKey(),
0, klen));
assertEquals(0, reader.blockIndex.blockContainingKey(toKV("c").getKey(), 0,
klen));
assertEquals(0, reader.blockIndex.blockContainingKey(toKV("d").getKey(), 0,
klen));
assertEquals(0, reader.blockIndex.blockContainingKey(toKV("e").getKey(), 0,
klen));
assertEquals(0, reader.blockIndex.blockContainingKey(toKV("g").getKey(), 0,
klen));
assertEquals(0, reader.blockIndex.blockContainingKey(toKV("h").getKey(), 0,
klen));
assertEquals(1, reader.blockIndex.blockContainingKey(toKV("i").getKey(), 0,
klen));
assertEquals(1, reader.blockIndex.blockContainingKey(toKV("j").getKey(), 0,
klen));
assertEquals(1, reader.blockIndex.blockContainingKey(toKV("k").getKey(), 0,
klen));
assertEquals(1, reader.blockIndex.blockContainingKey(toKV("l").getKey(), 0,
klen));
}
assertEquals(-1, blockIndexReader.rootBlockContainingKey(
toKV("a").getKey(), 0, klen));
assertEquals(0, blockIndexReader.rootBlockContainingKey(
toKV("c").getKey(), 0, klen));
assertEquals(0, blockIndexReader.rootBlockContainingKey(
toKV("d").getKey(), 0, klen));
assertEquals(0, blockIndexReader.rootBlockContainingKey(
toKV("e").getKey(), 0, klen));
assertEquals(0, blockIndexReader.rootBlockContainingKey(
toKV("g").getKey(), 0, klen));
assertEquals(0, blockIndexReader.rootBlockContainingKey(
toKV("h").getKey(), 0, klen));
assertEquals(1, blockIndexReader.rootBlockContainingKey(
toKV("i").getKey(), 0, klen));
assertEquals(1, blockIndexReader.rootBlockContainingKey(
toKV("j").getKey(), 0, klen));
assertEquals(1, blockIndexReader.rootBlockContainingKey(
toKV("k").getKey(), 0, klen));
assertEquals(1, blockIndexReader.rootBlockContainingKey(
toKV("l").getKey(), 0, klen));
}
}

View File

@ -283,7 +283,7 @@ public class TestHFileOutputFormat {
FileStatus[] file = fs.listStatus(sub3[0].getPath());
// open as HFile Reader and pull out TIMERANGE FileInfo.
HFile.Reader rd = new HFile.Reader(fs, file[0].getPath(), null, true,
HFile.Reader rd = HFile.createReader(fs, file[0].getPath(), null, true,
false);
Map<byte[],byte[]> finfo = rd.loadFileInfo();
byte[] range = finfo.get("TIMERANGE".getBytes());
@ -578,6 +578,9 @@ public class TestHFileOutputFormat {
try {
// partial map red setup to get an operational writer for testing
// We turn off the sequence file compression, because DefaultCodec
// pollutes the GZip codec pool with an incompatible compressor.
conf.set("io.seqfile.compression.type", "NONE");
Job job = new Job(conf, "testLocalMRIncrementalLoad");
setupRandomGeneratorMapper(job);
HFileOutputFormat.configureIncrementalLoad(job, table);
@ -607,7 +610,8 @@ public class TestHFileOutputFormat {
// verify that the compression on this file matches the configured
// compression
Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath();
Reader reader = new HFile.Reader(fileSystem, dataFilePath, null, false, true);
Reader reader = HFile.createReader(fileSystem, dataFilePath, null,
false, true);
reader.loadFileInfo();
assertEquals("Incorrect compression used for column family " + familyStr
+ "(reader: " + reader + ")",

View File

@ -101,8 +101,8 @@ public class TestLoadIncrementalHFiles {
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
createHFile(fs, new Path(familyDir, "hfile_" + hfileIdx++),
FAMILY, QUALIFIER, from, to, 1000);
createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
}
int expectedRows = hfileIdx * 1000;
@ -132,7 +132,7 @@ public class TestLoadIncrementalHFiles {
FileSystem fs = util.getTestFileSystem();
Path testIn = new Path(dir, "testhfile");
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
createHFile(fs, testIn, FAMILY, QUALIFIER,
createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
Path bottomOut = new Path(dir, "bottom.out");
@ -151,7 +151,7 @@ public class TestLoadIncrementalHFiles {
private int verifyHFile(Path p) throws IOException {
Configuration conf = util.getConfiguration();
HFile.Reader reader = new HFile.Reader(
HFile.Reader reader = HFile.createReader(
p.getFileSystem(conf), p, null, false, false);
reader.loadFileInfo();
HFileScanner scanner = reader.getScanner(false, false);
@ -171,11 +171,13 @@ public class TestLoadIncrementalHFiles {
* TODO put me in an HFileTestUtil or something?
*/
static void createHFile(
Configuration conf,
FileSystem fs, Path path,
byte[] family, byte[] qualifier,
byte[] startKey, byte[] endKey, int numRows) throws IOException
{
HFile.Writer writer = new HFile.Writer(fs, path, BLOCKSIZE, COMPRESSION,
HFile.Writer writer = HFile.getWriterFactory(conf).createWriter(fs, path,
BLOCKSIZE, COMPRESSION,
KeyValue.KEY_COMPARATOR);
long now = System.currentTimeMillis();
try {

View File

@ -70,7 +70,8 @@ public class TestFSErrorsExposed {
HBaseTestingUtility.getTestDir("internalScannerExposesErrors"),
"regionname"), "familyname");
FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem());
StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2*1024);
StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2*1024,
util.getConfiguration());
TestStoreFile.writeStoreFile(
writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
@ -111,7 +112,8 @@ public class TestFSErrorsExposed {
HBaseTestingUtility.getTestDir("internalScannerExposesErrors"),
"regionname"), "familyname");
FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem());
StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2 * 1024);
StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2 * 1024,
util.getConfiguration());
TestStoreFile.writeStoreFile(
writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.util.Progressable;
import org.mockito.Mockito;
import com.google.common.base.Joiner;
@ -204,7 +206,7 @@ public class TestStore extends TestCase {
Configuration c = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(c);
StoreFile.Writer w = StoreFile.createWriter(fs, storedir,
StoreFile.DEFAULT_BLOCKSIZE_SMALL);
StoreFile.DEFAULT_BLOCKSIZE_SMALL, c);
w.appendMetadata(seqid + 1, false);
w.close();
this.store.close();
@ -571,6 +573,14 @@ public class TestStore extends TestCase {
return new FaultyOutputStream(super.create(p), faultPos);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return new FaultyOutputStream(super.create(f, permission,
overwrite, bufferSize, replication, blockSize, progress), faultPos);
}
}
static class FaultyOutputStream extends FSDataOutputStream {

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.mockito.Mockito;
@ -87,7 +88,8 @@ public class TestStoreFile extends HBaseTestCase {
public void testBasicHalfMapFile() throws Exception {
// Make up a directory hierarchy that has a regiondir and familyname.
StoreFile.Writer writer = StoreFile.createWriter(this.fs,
new Path(new Path(this.testDir, "regionname"), "familyname"), 2 * 1024);
new Path(new Path(this.testDir, "regionname"), "familyname"), 2 * 1024,
conf);
writeStoreFile(writer);
checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf,
StoreFile.BloomType.NONE, false));
@ -127,7 +129,8 @@ public class TestStoreFile extends HBaseTestCase {
Path storedir = new Path(new Path(this.testDir, "regionname"), "familyname");
Path dir = new Path(storedir, "1234567890");
// Make a store file and write data to it.
StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024);
StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024,
conf);
writeStoreFile(writer);
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
StoreFile.BloomType.NONE, false);
@ -197,8 +200,11 @@ public class TestStoreFile extends HBaseTestCase {
(topScanner.isSeeked() && topScanner.next())) {
key = topScanner.getKey();
assertTrue(topScanner.getReader().getComparator().compare(key.array(),
key.arrayOffset(), key.limit(), midkey, 0, midkey.length) >= 0);
if (topScanner.getReader().getComparator().compare(key.array(),
key.arrayOffset(), key.limit(), midkey, 0, midkey.length) < 0) {
fail("key=" + Bytes.toStringBinary(key) + " < midkey=" +
Bytes.toStringBinary(midkey));
}
if (first) {
first = false;
LOG.info("First in top: " + Bytes.toString(Bytes.toBytes(key)));
@ -327,7 +333,8 @@ public class TestStoreFile extends HBaseTestCase {
private void bloomWriteRead(StoreFile.Writer writer, FileSystem fs)
throws Exception {
float err = conf.getFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
float err = conf.getFloat(
BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
Path f = writer.getPath();
long now = System.currentTimeMillis();
for (int i = 0; i < 2000; i += 2) {
@ -362,25 +369,24 @@ public class TestStoreFile extends HBaseTestCase {
}
reader.close();
fs.delete(f, true);
System.out.println("False negatives: " + falseNeg);
assertEquals(0, falseNeg);
System.out.println("False positives: " + falsePos);
if (!(falsePos <= 2* 2000 * err)) {
System.out.println("WTFBBQ! " + falsePos + ", " + (2* 2000 * err) );
}
assertTrue(falsePos <= 2* 2000 * err);
assertEquals("False negatives: " + falseNeg, 0, falseNeg);
int maxFalsePos = (int) (2 * 2000 * err);
assertTrue("Too many false positives: " + falsePos + " (err=" + err
+ ", expected no more than " + maxFalsePos + ")",
falsePos <= maxFalsePos);
}
public void testBloomFilter() throws Exception {
FileSystem fs = FileSystem.getLocal(conf);
conf.setFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, (float)0.01);
conf.setBoolean(StoreFile.IO_STOREFILE_BLOOM_ENABLED, true);
conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE,
(float) 0.01);
conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
// write the file
Path f = new Path(ROOT_DIR, getName());
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000, false);
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000);
bloomWriteRead(writer, fs);
}
@ -388,8 +394,8 @@ public class TestStoreFile extends HBaseTestCase {
public void testBloomTypes() throws Exception {
float err = (float) 0.01;
FileSystem fs = FileSystem.getLocal(conf);
conf.setFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, err);
conf.setBoolean(StoreFile.IO_STOREFILE_BLOOM_ENABLED, true);
conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err);
conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
int rowCount = 50;
int colCount = 10;
@ -411,7 +417,7 @@ public class TestStoreFile extends HBaseTestCase {
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
StoreFile.DEFAULT_BLOCKSIZE_SMALL,
HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, bt[x], expKeys[x], false);
conf, KeyValue.COMPARATOR, bt[x], expKeys[x]);
long now = System.currentTimeMillis();
for (int i = 0; i < rowCount*2; i += 2) { // rows
@ -471,19 +477,23 @@ public class TestStoreFile extends HBaseTestCase {
float err = (float)0.005;
FileSystem fs = FileSystem.getLocal(conf);
Path f = new Path(ROOT_DIR, getName());
conf.setFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, err);
conf.setBoolean(StoreFile.IO_STOREFILE_BLOOM_ENABLED, true);
conf.setInt(StoreFile.IO_STOREFILE_BLOOM_MAX_KEYS, 1000);
conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err);
conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_MAX_KEYS, 1000);
// This test only runs for HFile format version 1.
conf.setInt(HFile.FORMAT_VERSION_KEY, 1);
// this should not create a bloom because the max keys is too small
StoreFile.Writer writer = new StoreFile.Writer(fs, f,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000, false);
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000);
assertFalse(writer.hasBloom());
writer.close();
fs.delete(f, true);
conf.setInt(StoreFile.IO_STOREFILE_BLOOM_MAX_KEYS, Integer.MAX_VALUE);
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_MAX_KEYS,
Integer.MAX_VALUE);
// TODO: commented out because we run out of java heap space on trunk
/*
@ -500,8 +510,7 @@ public class TestStoreFile extends HBaseTestCase {
// because Java can't create a contiguous array > MAX_INT
writer = new StoreFile.Writer(fs, f,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, Integer.MAX_VALUE,
false);
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, Integer.MAX_VALUE);
assertFalse(writer.hasBloom());
writer.close();
fs.delete(f, true);
@ -556,7 +565,7 @@ public class TestStoreFile extends HBaseTestCase {
}
/**
*Generate a list of KeyValues for testing based on given parameters
* Generate a list of KeyValues for testing based on given parameters
* @param timestamps
* @param numRows
* @param qualifier
@ -592,7 +601,8 @@ public class TestStoreFile extends HBaseTestCase {
Path storedir = new Path(new Path(this.testDir, "regionname"),
"familyname");
Path dir = new Path(storedir, "1234567890");
StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024);
StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024,
conf);
List<KeyValue> kvList = getKeyValueSet(timestamps,numRows,
family, qualifier);
@ -645,7 +655,7 @@ public class TestStoreFile extends HBaseTestCase {
long startEvicted = cs.getEvictedCount();
// Let's write a StoreFile with three blocks, with cache on write off
conf.setBoolean("hbase.rs.cacheblocksonwrite", false);
conf.setBoolean(HFile.CACHE_BLOCKS_ON_WRITE_KEY, false);
Path pathCowOff = new Path(baseDir, "123456789");
StoreFile.Writer writer = writeStoreFile(conf, pathCowOff, 3);
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
@ -666,7 +676,7 @@ public class TestStoreFile extends HBaseTestCase {
reader.close();
// Now write a StoreFile with three blocks, with cache on write on
conf.setBoolean("hbase.rs.cacheblocksonwrite", true);
conf.setBoolean(HFile.CACHE_BLOCKS_ON_WRITE_KEY, true);
Path pathCowOn = new Path(baseDir, "123456788");
writer = writeStoreFile(conf, pathCowOn, 3);
hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
@ -702,6 +712,12 @@ public class TestStoreFile extends HBaseTestCase {
while ((kv1 = scannerOne.next()) != null) {
kv2 = scannerTwo.next();
assertTrue(kv1.equals(kv2));
assertTrue(Bytes.compareTo(
kv1.getBuffer(), kv1.getKeyOffset(), kv1.getKeyLength(),
kv2.getBuffer(), kv2.getKeyOffset(), kv2.getKeyLength()) == 0);
assertTrue(Bytes.compareTo(
kv1.getBuffer(), kv1.getValueOffset(), kv1.getValueLength(),
kv2.getBuffer(), kv2.getValueOffset(), kv2.getValueLength()) == 0);
}
assertNull(scannerTwo.next());
assertEquals(startHit + 6, cs.getHitCount());
@ -755,8 +771,7 @@ public class TestStoreFile extends HBaseTestCase {
int blockSize = totalSize / numBlocks;
StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize,
HFile.DEFAULT_COMPRESSION_ALGORITHM,
conf, KeyValue.COMPARATOR, StoreFile.BloomType.NONE, 2000,
conf.getBoolean("hbase.rs.cacheblocksonwrite", false));
conf, KeyValue.COMPARATOR, StoreFile.BloomType.NONE, 2000);
// We'll write N-1 KVs to ensure we don't write an extra block
kvs.remove(kvs.size()-1);
for (KeyValue kv : kvs) {

View File

@ -199,7 +199,7 @@ public class TestWALReplay {
HLog wal = createWAL(this.conf);
HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
Path f = new Path(basedir, "hfile");
HFile.Writer writer = new HFile.Writer(this.fs, f);
HFile.Writer writer = HFile.getWriterFactory(conf).createWriter(this.fs, f);
byte [] family = htd.getFamilies().iterator().next().getName();
byte [] row = Bytes.toBytes(tableNameStr);
writer.append(new KeyValue(row, family, family, row));

View File

@ -23,46 +23,45 @@ package org.apache.hadoop.hbase.util;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.util.BitSet;
import junit.framework.TestCase;
public class TestByteBloomFilter extends TestCase {
public void testBasicBloom() throws Exception {
ByteBloomFilter bf1 = new ByteBloomFilter(1000, (float)0.01, Hash.MURMUR_HASH, 0);
ByteBloomFilter bf2 = new ByteBloomFilter(1000, (float)0.01, Hash.MURMUR_HASH, 0);
bf1.allocBloom();
bf2.allocBloom();
// test 1: verify no fundamental false negatives or positives
byte[] key1 = {1,2,3,4,5,6,7,8,9};
byte[] key2 = {1,2,3,4,5,6,7,8,7};
bf1.add(key1);
bf2.add(key2);
assertTrue(bf1.contains(key1));
assertFalse(bf1.contains(key2));
assertFalse(bf2.contains(key1));
assertTrue(bf2.contains(key2));
byte [] bkey = {1,2,3,4};
byte [] bval = "this is a much larger byte array".getBytes();
bf1.add(bkey);
bf1.add(bval, 1, bval.length-1);
assertTrue( bf1.contains(bkey) );
assertTrue( bf1.contains(bval, 1, bval.length-1) );
assertFalse( bf1.contains(bval) );
assertFalse( bf1.contains(bval) );
// test 2: serialization & deserialization.
// test 2: serialization & deserialization.
// (convert bloom to byte array & read byte array back in as input)
ByteArrayOutputStream bOut = new ByteArrayOutputStream();
bf1.writeBloom(new DataOutputStream(bOut));
ByteBuffer bb = ByteBuffer.wrap(bOut.toByteArray());
ByteBuffer bb = ByteBuffer.wrap(bOut.toByteArray());
ByteBloomFilter newBf1 = new ByteBloomFilter(1000, (float)0.01,
Hash.MURMUR_HASH, 0);
assertTrue(newBf1.contains(key1, bb));
@ -71,16 +70,17 @@ public class TestByteBloomFilter extends TestCase {
assertTrue( newBf1.contains(bval, 1, bval.length-1, bb) );
assertFalse( newBf1.contains(bval, bb) );
assertFalse( newBf1.contains(bval, bb) );
System.out.println("Serialized as " + bOut.size() + " bytes");
assertTrue(bOut.size() - bf1.byteSize < 10); //... allow small padding
}
public void testBloomFold() throws Exception {
// test: foldFactor < log(max/actual)
ByteBloomFilter b = new ByteBloomFilter(1003, (float)0.01, Hash.MURMUR_HASH, 2);
ByteBloomFilter b = new ByteBloomFilter(1003, (float) 0.01,
Hash.MURMUR_HASH, 2);
b.allocBloom();
int origSize = b.getByteSize();
long origSize = b.getByteSize();
assertEquals(1204, origSize);
for (int i = 0; i < 12; ++i) {
b.add(Bytes.toBytes(i));
@ -106,7 +106,7 @@ public class TestByteBloomFilter extends TestCase {
ByteBloomFilter b = new ByteBloomFilter(10*1000*1000, (float)err, Hash.MURMUR_HASH, 3);
b.allocBloom();
long startTime = System.currentTimeMillis();
int origSize = b.getByteSize();
long origSize = b.getByteSize();
for (int i = 0; i < 1*1000*1000; ++i) {
b.add(Bytes.toBytes(i));
}
@ -119,12 +119,12 @@ public class TestByteBloomFilter extends TestCase {
endTime = System.currentTimeMillis();
System.out.println("Total Fold time = " + (endTime - startTime) + "ms");
assertTrue(origSize >= b.getByteSize()<<3);
// test
startTime = System.currentTimeMillis();
int falsePositives = 0;
for (int i = 0; i < 2*1000*1000; ++i) {
if (b.contains(Bytes.toBytes(i))) {
if(i >= 1*1000*1000) falsePositives++;
} else {
@ -138,4 +138,27 @@ public class TestByteBloomFilter extends TestCase {
// test: foldFactor > log(max/actual)
}
public void testSizing() {
int bitSize = 8 * 128 * 1024; // 128 KB
double errorRate = 0.025; // target false positive rate
// How many keys can we store in a Bloom filter of this size maintaining
// the given false positive rate, not taking into account that the n
long maxKeys = ByteBloomFilter.idealMaxKeys(bitSize, errorRate);
assertEquals(136570, maxKeys);
// A reverse operation: how many bits would we need to store this many keys
// and keep the same low false positive rate?
long bitSize2 = ByteBloomFilter.computeBitSize(maxKeys, errorRate);
// The bit size comes out a little different due to rounding.
assertTrue(Math.abs(bitSize2 - bitSize) * 1.0 / bitSize < 1e-5);
}
public void testFoldableByteSize() {
assertEquals(128, ByteBloomFilter.computeFoldableByteSize(1000, 5));
assertEquals(640, ByteBloomFilter.computeFoldableByteSize(5001, 4));
}
}

View File

@ -19,6 +19,10 @@
*/
package org.apache.hadoop.hbase.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
@ -142,6 +146,8 @@ public class TestBytes extends TestCase {
byte [] key2 = {4,9};
byte [] key2_2 = {4};
byte [] key3 = {5,11};
byte [] key4 = {0};
byte [] key5 = {2};
assertEquals(1, Bytes.binarySearch(arr, key1, 0, 1,
Bytes.BYTES_RAWCOMPARATOR));
@ -157,8 +163,22 @@ public class TestBytes extends TestCase {
Bytes.BYTES_RAWCOMPARATOR));
assertEquals(5, Bytes.binarySearch(arr, key3, 1, 1,
Bytes.BYTES_RAWCOMPARATOR));
assertEquals(-1,
Bytes.binarySearch(arr, key4, 0, 1, Bytes.BYTES_RAWCOMPARATOR));
assertEquals(-2,
Bytes.binarySearch(arr, key5, 0, 1, Bytes.BYTES_RAWCOMPARATOR));
// Search for values to the left and to the right of each item in the array.
for (int i = 0; i < arr.length; ++i) {
assertEquals(-(i + 1), Bytes.binarySearch(arr,
new byte[] { (byte) (arr[i][0] - 1) }, 0, 1,
Bytes.BYTES_RAWCOMPARATOR));
assertEquals(-(i + 2), Bytes.binarySearch(arr,
new byte[] { (byte) (arr[i][0] + 1) }, 0, 1,
Bytes.BYTES_RAWCOMPARATOR));
}
}
public void testStartsWith() {
assertTrue(Bytes.startsWith(Bytes.toBytes("hello"), Bytes.toBytes("h")));
assertTrue(Bytes.startsWith(Bytes.toBytes("hello"), Bytes.toBytes("")));
@ -202,4 +222,30 @@ public class TestBytes extends TestCase {
return (Bytes.toLong(testValue) + amount) == incrementResult;
}
public void testFixedSizeString() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
Bytes.writeStringFixedSize(dos, "Hello", 5);
Bytes.writeStringFixedSize(dos, "World", 18);
Bytes.writeStringFixedSize(dos, "", 9);
try {
// Use a long dash which is three bytes in UTF-8. If encoding happens
// using ISO-8859-1, this will fail.
Bytes.writeStringFixedSize(dos, "Too\u2013Long", 9);
fail("Exception expected");
} catch (IOException ex) {
assertEquals(
"Trying to write 10 bytes (Too\\xE2\\x80\\x93Long) into a field of " +
"length 9", ex.getMessage());
}
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInputStream dis = new DataInputStream(bais);
assertEquals("Hello", Bytes.readStringFixedSize(dis, 5));
assertEquals("World", Bytes.readStringFixedSize(dis, 18));
assertEquals("", Bytes.readStringFixedSize(dis, 9));
}
}