HBASE-3857 New files

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1153646 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-08-03 20:28:18 +00:00
parent 3eadbe11a1
commit c30bce0eaa
3 changed files with 349 additions and 302 deletions

View File

@ -1,302 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Implements a <i>dynamic Bloom filter</i>, as defined in the INFOCOM 2006 paper.
* <p>
* A dynamic Bloom filter (DBF) makes use of a <code>s * m</code> bit matrix but
* each of the <code>s</code> rows is a standard Bloom filter. The creation
* process of a DBF is iterative. At the start, the DBF is a <code>1 * m</code>
* bit matrix, i.e., it is composed of a single standard Bloom filter.
* It assumes that <code>n<sub>r</sub></code> elements are recorded in the
* initial bit vector, where <code>n<sub>r</sub> <= n</code> (<code>n</code> is
* the cardinality of the set <code>A</code> to record in the filter).
* <p>
* As the size of <code>A</code> grows during the execution of the application,
* several keys must be inserted in the DBF. When inserting a key into the DBF,
* one must first get an active Bloom filter in the matrix. A Bloom filter is
* active when the number of recorded keys, <code>n<sub>r</sub></code>, is
* strictly less than the current cardinality of <code>A</code>, <code>n</code>.
* If an active Bloom filter is found, the key is inserted and
* <code>n<sub>r</sub></code> is incremented by one. On the other hand, if there
* is no active Bloom filter, a new one is created (i.e., a new row is added to
* the matrix) according to the current size of <code>A</code> and the element
* is added in this new Bloom filter and the <code>n<sub>r</sub></code> value of
* this new Bloom filter is set to one. A given key is said to belong to the
* DBF if the <code>k</code> positions are set to one in one of the matrix rows.
* <p>
* Originally created by
* <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
*
* @see BloomFilter A Bloom filter
*
* @see <a href="http://www.cse.fau.edu/~jie/research/publications/Publication_files/infocom2006.pdf">Theory and Network Applications of Dynamic Bloom Filters</a>
*/
public class DynamicByteBloomFilter implements BloomFilter {
/** Current file format version */
public static final int VERSION = 2;
/** Maximum number of keys in a dynamic Bloom filter row. */
protected final int keyInterval;
/** The maximum false positive rate per bloom */
protected final float errorRate;
/** Hash type */
protected final int hashType;
/** The number of keys recorded in the current Bloom filter. */
protected int curKeys;
/** expected size of bloom filter matrix (used during reads) */
protected int readMatrixSize;
/** The matrix of Bloom filters (contains bloom data only during writes). */
protected ByteBloomFilter[] matrix;
/**
* Normal read constructor. Loads bloom filter meta data.
* @param meta stored bloom meta data
* @throws IllegalArgumentException meta data is invalid
*/
public DynamicByteBloomFilter(ByteBuffer meta) throws IllegalArgumentException {
int version = meta.getInt();
if (version != VERSION) throw new IllegalArgumentException("Bad version");
this.keyInterval = meta.getInt();
this.errorRate = meta.getFloat();
this.hashType = meta.getInt();
this.readMatrixSize = meta.getInt();
this.curKeys = meta.getInt();
readSanityCheck();
this.matrix = new ByteBloomFilter[1];
this.matrix[0] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0);
}
/**
* Normal write constructor. Note that this doesn't allocate bloom data by
* default. Instead, call allocBloom() before adding entries.
* @param errorRate
* @param hashType type of the hashing function (see <code>org.apache.hadoop.util.hash.Hash</code>).
* @param keyInterval Maximum number of keys to record per Bloom filter row.
* @throws IllegalArgumentException The input parameters were invalid
*/
public DynamicByteBloomFilter(int keyInterval, float errorRate, int hashType)
throws IllegalArgumentException {
this.keyInterval = keyInterval;
this.errorRate = errorRate;
this.hashType = hashType;
this.curKeys = 0;
if(keyInterval <= 0) {
throw new IllegalArgumentException("keyCount must be > 0");
}
this.matrix = new ByteBloomFilter[1];
this.matrix[0] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0);
}
@Override
public void allocBloom() {
this.matrix[0].allocBloom();
}
void readSanityCheck() throws IllegalArgumentException {
if (this.curKeys <= 0) {
throw new IllegalArgumentException("last bloom's key count invalid");
}
if (this.readMatrixSize <= 0) {
throw new IllegalArgumentException("matrix size must be known");
}
}
@Override
public void add(byte []buf, int offset, int len) {
BloomFilter bf = getCurBloom();
if (bf == null) {
addRow();
bf = matrix[matrix.length - 1];
curKeys = 0;
}
bf.add(buf, offset, len);
curKeys++;
}
@Override
public void add(byte []buf) {
add(buf, 0, buf.length);
}
/**
* Should only be used in tests when writing a bloom filter.
*/
boolean contains(byte [] buf) {
return contains(buf, 0, buf.length);
}
/**
* Should only be used in tests when writing a bloom filter.
*/
boolean contains(byte [] buf, int offset, int length) {
for (int i = 0; i < matrix.length; i++) {
if (matrix[i].contains(buf, offset, length)) {
return true;
}
}
return false;
}
@Override
public boolean contains(byte [] buf, ByteBuffer theBloom) {
return contains(buf, 0, buf.length, theBloom);
}
@Override
public boolean contains(byte[] buf, int offset, int length,
ByteBuffer theBloom) {
if(offset + length > buf.length) {
return false;
}
// current version assumes uniform size
int bytesPerBloom = this.matrix[0].getByteSize();
if(theBloom.limit() != bytesPerBloom * readMatrixSize) {
throw new IllegalArgumentException("Bloom does not match expected size");
}
ByteBuffer tmp = theBloom.duplicate();
// note: actually searching an array of blooms that have been serialized
for (int m = 0; m < readMatrixSize; ++m) {
tmp.position(m* bytesPerBloom);
tmp.limit(tmp.position() + bytesPerBloom);
boolean match = this.matrix[0].contains(buf, offset, length, tmp.slice());
if (match) {
return true;
}
}
// matched no bloom filters
return false;
}
int bloomCount() {
return Math.max(this.matrix.length, this.readMatrixSize);
}
@Override
public int getKeyCount() {
return (bloomCount()-1) * this.keyInterval + this.curKeys;
}
@Override
public int getMaxKeys() {
return bloomCount() * this.keyInterval;
}
@Override
public int getByteSize() {
return bloomCount() * this.matrix[0].getByteSize();
}
@Override
public void compactBloom() {
}
/**
* Adds a new row to <i>this</i> dynamic Bloom filter.
*/
private void addRow() {
ByteBloomFilter[] tmp = new ByteBloomFilter[matrix.length + 1];
for (int i = 0; i < matrix.length; i++) {
tmp[i] = matrix[i];
}
tmp[tmp.length-1] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0);
tmp[tmp.length-1].allocBloom();
matrix = tmp;
}
/**
* Returns the currently-unfilled row in the dynamic Bloom Filter array.
* @return BloomFilter The active standard Bloom filter.
* <code>Null</code> otherwise.
*/
private BloomFilter getCurBloom() {
if (curKeys >= keyInterval) {
return null;
}
return matrix[matrix.length - 1];
}
@Override
public Writable getMetaWriter() {
return new MetaWriter();
}
@Override
public Writable getDataWriter() {
return new DataWriter();
}
private class MetaWriter implements Writable {
protected MetaWriter() {}
@Override
public void readFields(DataInput arg0) throws IOException {
throw new IOException("Cant read with this class.");
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(VERSION);
out.writeInt(keyInterval);
out.writeFloat(errorRate);
out.writeInt(hashType);
out.writeInt(matrix.length);
out.writeInt(curKeys);
}
}
private class DataWriter implements Writable {
protected DataWriter() {}
@Override
public void readFields(DataInput arg0) throws IOException {
throw new IOException("Cant read with this class.");
}
@Override
public void write(DataOutput out) throws IOException {
for (int i = 0; i < matrix.length; ++i) {
matrix[i].writeBloom(out);
}
}
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Allows multiple concurrent clients to lock on a numeric id with a minimal
* memory overhead. The intended usage is as follows:
*
* <pre>
* IdLock.Entry lockEntry = idLock.getLockEntry(id);
* try {
* // User code.
* } finally {
* idLock.releaseLockEntry(lockEntry);
* }</pre>
*/
public class IdLock {
/** An entry returned to the client as a lock object */
public static class Entry {
private final long id;
private int numWaiters;
private boolean isLocked = true;
private Entry(long id) {
this.id = id;
}
public String toString() {
return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
+ isLocked;
}
}
private ConcurrentMap<Long, Entry> map =
new ConcurrentHashMap<Long, Entry>();
/**
* Blocks until the lock corresponding to the given id is acquired.
*
* @param id an arbitrary number to lock on
* @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
* the lock
* @throws IOException if interrupted
*/
public Entry getLockEntry(long id) throws IOException {
Entry entry = new Entry(id);
Entry existing;
while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
synchronized (existing) {
if (existing.isLocked) {
++existing.numWaiters; // Add ourselves to waiters.
while (existing.isLocked) {
try {
existing.wait();
} catch (InterruptedException e) {
--existing.numWaiters; // Remove ourselves from waiters.
throw new InterruptedIOException(
"Interrupted waiting to acquire sparse lock");
}
}
--existing.numWaiters; // Remove ourselves from waiters.
existing.isLocked = true;
return existing;
}
// If the entry is not locked, it might already be deleted from the
// map, so we cannot return it. We need to get our entry into the map
// or get someone else's locked entry.
}
}
return entry;
}
/**
* Must be called in a finally block to decrease the internal counter and
* remove the monitor object for the given id if the caller is the last
* client.
*
* @param entry the return value of {@link #getLockEntry(long)}
*/
public void releaseLockEntry(Entry entry) {
synchronized (entry) {
entry.isLocked = false;
if (entry.numWaiters > 0) {
entry.notify();
} else {
map.remove(entry.id);
}
}
}
/** For testing */
void assertMapEmpty() {
assert map.size() == 0;
}
}

View File

@ -0,0 +1,229 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import static org.junit.Assert.*;
/**
* Tests {@link HFile} cache-on-write functionality for the following block
* types: data blocks, non-root index blocks, and Bloom filter blocks.
*/
@RunWith(Parameterized.class)
public class TestCacheOnWrite {
private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class);
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private Configuration conf;
private FileSystem fs;
private Random rand = new Random(12983177L);
private Path storeFilePath;
private Compression.Algorithm compress;
private CacheOnWriteType cowType;
private BlockCache blockCache;
private String testName;
private static final int DATA_BLOCK_SIZE = 2048;
private static final int NUM_KV = 25000;
private static final int INDEX_BLOCK_SIZE = 512;
private static final int BLOOM_BLOCK_SIZE = 4096;
/** The number of valid key types possible in a store file */
private static final int NUM_VALID_KEY_TYPES =
KeyValue.Type.values().length - 2;
private static enum CacheOnWriteType {
DATA_BLOCKS(BlockType.DATA, HFile.CACHE_BLOCKS_ON_WRITE_KEY),
BLOOM_BLOCKS(BlockType.BLOOM_CHUNK,
BloomFilterFactory.IO_STOREFILE_BLOOM_CACHE_ON_WRITE),
INDEX_BLOCKS(BlockType.LEAF_INDEX,
HFileBlockIndex.CACHE_INDEX_BLOCKS_ON_WRITE_KEY);
private final String confKey;
private final BlockType inlineBlockType;
private CacheOnWriteType(BlockType inlineBlockType, String confKey) {
this.inlineBlockType = inlineBlockType;
this.confKey = confKey;
}
public boolean shouldBeCached(BlockType blockType) {
return blockType == inlineBlockType
|| blockType == BlockType.INTERMEDIATE_INDEX
&& inlineBlockType == BlockType.LEAF_INDEX;
}
public void modifyConf(Configuration conf) {
for (CacheOnWriteType cowType : CacheOnWriteType.values())
conf.setBoolean(cowType.confKey, cowType == this);
}
}
public TestCacheOnWrite(CacheOnWriteType cowType,
Compression.Algorithm compress) {
this.cowType = cowType;
this.compress = compress;
testName = "[cacheOnWrite=" + cowType + ", compress=" + compress + "]";
}
@Parameters
public static Collection<Object[]> getParameters() {
List<Object[]> cowTypes = new ArrayList<Object[]>();
for (CacheOnWriteType cowType : CacheOnWriteType.values())
for (Compression.Algorithm compress :
HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
cowTypes.add(new Object[] { cowType, compress });
}
return cowTypes;
}
@Before
public void setUp() throws IOException {
conf = TEST_UTIL.getConfiguration();
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
BLOOM_BLOCK_SIZE);
cowType.modifyConf(conf);
fs = FileSystem.get(conf);
blockCache = StoreFile.getBlockCache(conf);
}
@After
public void tearDown() {
blockCache.evictBlocksByPrefix("");
}
@Test
public void testCacheOnWrite() throws IOException {
writeStoreFile();
readStoreFile();
}
private void readStoreFile() throws IOException {
HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs,
storeFilePath, null, false, false);
LOG.info("HFile information: " + reader);
HFileScanner scanner = reader.getScanner(false, false);
assertTrue(testName, scanner.seekTo());
long offset = 0;
HFileBlock prevBlock = null;
EnumMap<BlockType, Integer> blockCountByType =
new EnumMap<BlockType, Integer>(BlockType.class);
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
HFileBlock block = reader.readBlockData(offset, prevBlock == null ? -1
: prevBlock.getNextBlockOnDiskSizeWithHeader(), -1, false);
String blockCacheKey = HFile.getBlockCacheKey(reader.getName(), offset);
boolean isCached = blockCache.getBlock(blockCacheKey, true) != null;
boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
assertEquals(testName + " " + block, shouldBeCached, isCached);
prevBlock = block;
offset += block.getOnDiskSizeWithHeader();
BlockType bt = block.getBlockType();
Integer count = blockCountByType.get(bt);
blockCountByType.put(bt, (count == null ? 0 : count) + 1);
}
LOG.info("Block count by type: " + blockCountByType);
assertEquals(
"{DATA=1367, LEAF_INDEX=172, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
blockCountByType.toString());
reader.close();
}
public static KeyValue.Type generateKeyType(Random rand) {
if (rand.nextBoolean()) {
// Let's make half of KVs puts.
return KeyValue.Type.Put;
} else {
KeyValue.Type keyType =
KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum)
{
throw new RuntimeException("Generated an invalid key type: " + keyType
+ ". " + "Probably the layout of KeyValue.Type has changed.");
}
return keyType;
}
}
public void writeStoreFile() throws IOException {
Path storeFileParentDir = new Path(HBaseTestingUtility.getTestDir(),
"test_cache_on_write");
StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir,
DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf,
StoreFile.BloomType.ROWCOL, NUM_KV);
final int rowLen = 32;
for (int i = 0; i < NUM_KV; ++i) {
byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i);
byte[] v = TestHFileWriterV2.randomValue(rand);
int cfLen = rand.nextInt(k.length - rowLen + 1);
KeyValue kv = new KeyValue(
k, 0, rowLen,
k, rowLen, cfLen,
k, rowLen + cfLen, k.length - rowLen - cfLen,
rand.nextLong(),
generateKeyType(rand),
v, 0, v.length);
sfw.append(kv);
}
sfw.close();
storeFilePath = sfw.getPath();
}
}