From c30bce0eaa4359df93eb2a7a468149fc8416c231 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Wed, 3 Aug 2011 20:28:18 +0000 Subject: [PATCH] HBASE-3857 New files git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1153646 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/util/DynamicByteBloomFilter.java | 302 ------------------ .../org/apache/hadoop/hbase/util/IdLock.java | 120 +++++++ .../hbase/io/hfile/TestCacheOnWrite.java | 229 +++++++++++++ 3 files changed, 349 insertions(+), 302 deletions(-) delete mode 100644 src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java create mode 100644 src/main/java/org/apache/hadoop/hbase/util/IdLock.java create mode 100644 src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java diff --git a/src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java b/src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java deleted file mode 100644 index 441167bc13a..00000000000 --- a/src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.util; - -import org.apache.hadoop.io.Writable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -/** - * Implements a dynamic Bloom filter, as defined in the INFOCOM 2006 paper. - *

- * A dynamic Bloom filter (DBF) makes use of a s * m bit matrix but - * each of the s rows is a standard Bloom filter. The creation - * process of a DBF is iterative. At the start, the DBF is a 1 * m - * bit matrix, i.e., it is composed of a single standard Bloom filter. - * It assumes that nr elements are recorded in the - * initial bit vector, where nr <= n (n is - * the cardinality of the set A to record in the filter). - *

- * As the size of A grows during the execution of the application, - * several keys must be inserted in the DBF. When inserting a key into the DBF, - * one must first get an active Bloom filter in the matrix. A Bloom filter is - * active when the number of recorded keys, nr, is - * strictly less than the current cardinality of A, n. - * If an active Bloom filter is found, the key is inserted and - * nr is incremented by one. On the other hand, if there - * is no active Bloom filter, a new one is created (i.e., a new row is added to - * the matrix) according to the current size of A and the element - * is added in this new Bloom filter and the nr value of - * this new Bloom filter is set to one. A given key is said to belong to the - * DBF if the k positions are set to one in one of the matrix rows. - *

- * Originally created by - * European Commission One-Lab Project 034819. - * - * @see BloomFilter A Bloom filter - * - * @see Theory and Network Applications of Dynamic Bloom Filters - */ -public class DynamicByteBloomFilter implements BloomFilter { - /** Current file format version */ - public static final int VERSION = 2; - /** Maximum number of keys in a dynamic Bloom filter row. */ - protected final int keyInterval; - /** The maximum false positive rate per bloom */ - protected final float errorRate; - /** Hash type */ - protected final int hashType; - /** The number of keys recorded in the current Bloom filter. */ - protected int curKeys; - /** expected size of bloom filter matrix (used during reads) */ - protected int readMatrixSize; - /** The matrix of Bloom filters (contains bloom data only during writes). */ - protected ByteBloomFilter[] matrix; - - /** - * Normal read constructor. Loads bloom filter meta data. - * @param meta stored bloom meta data - * @throws IllegalArgumentException meta data is invalid - */ - public DynamicByteBloomFilter(ByteBuffer meta) throws IllegalArgumentException { - int version = meta.getInt(); - if (version != VERSION) throw new IllegalArgumentException("Bad version"); - - this.keyInterval = meta.getInt(); - this.errorRate = meta.getFloat(); - this.hashType = meta.getInt(); - this.readMatrixSize = meta.getInt(); - this.curKeys = meta.getInt(); - - readSanityCheck(); - - this.matrix = new ByteBloomFilter[1]; - this.matrix[0] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0); -} - - /** - * Normal write constructor. Note that this doesn't allocate bloom data by - * default. Instead, call allocBloom() before adding entries. - * @param errorRate - * @param hashType type of the hashing function (see org.apache.hadoop.util.hash.Hash). - * @param keyInterval Maximum number of keys to record per Bloom filter row. - * @throws IllegalArgumentException The input parameters were invalid - */ - public DynamicByteBloomFilter(int keyInterval, float errorRate, int hashType) - throws IllegalArgumentException { - this.keyInterval = keyInterval; - this.errorRate = errorRate; - this.hashType = hashType; - this.curKeys = 0; - - if(keyInterval <= 0) { - throw new IllegalArgumentException("keyCount must be > 0"); - } - - this.matrix = new ByteBloomFilter[1]; - this.matrix[0] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0); -} - - @Override - public void allocBloom() { - this.matrix[0].allocBloom(); - } - - void readSanityCheck() throws IllegalArgumentException { - if (this.curKeys <= 0) { - throw new IllegalArgumentException("last bloom's key count invalid"); - } - - if (this.readMatrixSize <= 0) { - throw new IllegalArgumentException("matrix size must be known"); - } - } - - @Override - public void add(byte []buf, int offset, int len) { - BloomFilter bf = getCurBloom(); - - if (bf == null) { - addRow(); - bf = matrix[matrix.length - 1]; - curKeys = 0; - } - - bf.add(buf, offset, len); - curKeys++; - } - - @Override - public void add(byte []buf) { - add(buf, 0, buf.length); - } - - /** - * Should only be used in tests when writing a bloom filter. - */ - boolean contains(byte [] buf) { - return contains(buf, 0, buf.length); - } - - /** - * Should only be used in tests when writing a bloom filter. - */ - boolean contains(byte [] buf, int offset, int length) { - for (int i = 0; i < matrix.length; i++) { - if (matrix[i].contains(buf, offset, length)) { - return true; - } - } - return false; - } - - @Override - public boolean contains(byte [] buf, ByteBuffer theBloom) { - return contains(buf, 0, buf.length, theBloom); - } - - @Override - public boolean contains(byte[] buf, int offset, int length, - ByteBuffer theBloom) { - if(offset + length > buf.length) { - return false; - } - - // current version assumes uniform size - int bytesPerBloom = this.matrix[0].getByteSize(); - - if(theBloom.limit() != bytesPerBloom * readMatrixSize) { - throw new IllegalArgumentException("Bloom does not match expected size"); - } - - ByteBuffer tmp = theBloom.duplicate(); - - // note: actually searching an array of blooms that have been serialized - for (int m = 0; m < readMatrixSize; ++m) { - tmp.position(m* bytesPerBloom); - tmp.limit(tmp.position() + bytesPerBloom); - boolean match = this.matrix[0].contains(buf, offset, length, tmp.slice()); - if (match) { - return true; - } - } - - // matched no bloom filters - return false; - } - - int bloomCount() { - return Math.max(this.matrix.length, this.readMatrixSize); - } - - @Override - public int getKeyCount() { - return (bloomCount()-1) * this.keyInterval + this.curKeys; - } - - @Override - public int getMaxKeys() { - return bloomCount() * this.keyInterval; - } - - @Override - public int getByteSize() { - return bloomCount() * this.matrix[0].getByteSize(); - } - - @Override - public void compactBloom() { - } - - /** - * Adds a new row to this dynamic Bloom filter. - */ - private void addRow() { - ByteBloomFilter[] tmp = new ByteBloomFilter[matrix.length + 1]; - - for (int i = 0; i < matrix.length; i++) { - tmp[i] = matrix[i]; - } - - tmp[tmp.length-1] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0); - tmp[tmp.length-1].allocBloom(); - matrix = tmp; - } - - /** - * Returns the currently-unfilled row in the dynamic Bloom Filter array. - * @return BloomFilter The active standard Bloom filter. - * Null otherwise. - */ - private BloomFilter getCurBloom() { - if (curKeys >= keyInterval) { - return null; - } - - return matrix[matrix.length - 1]; - } - - @Override - public Writable getMetaWriter() { - return new MetaWriter(); - } - - @Override - public Writable getDataWriter() { - return new DataWriter(); - } - - private class MetaWriter implements Writable { - protected MetaWriter() {} - @Override - public void readFields(DataInput arg0) throws IOException { - throw new IOException("Cant read with this class."); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(VERSION); - out.writeInt(keyInterval); - out.writeFloat(errorRate); - out.writeInt(hashType); - out.writeInt(matrix.length); - out.writeInt(curKeys); - } - } - - private class DataWriter implements Writable { - protected DataWriter() {} - @Override - public void readFields(DataInput arg0) throws IOException { - throw new IOException("Cant read with this class."); - } - - @Override - public void write(DataOutput out) throws IOException { - for (int i = 0; i < matrix.length; ++i) { - matrix[i].writeBloom(out); - } - } - } -} diff --git a/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/src/main/java/org/apache/hadoop/hbase/util/IdLock.java new file mode 100644 index 00000000000..e9202dde61c --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/IdLock.java @@ -0,0 +1,120 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Allows multiple concurrent clients to lock on a numeric id with a minimal + * memory overhead. The intended usage is as follows: + * + *

+ * IdLock.Entry lockEntry = idLock.getLockEntry(id);
+ * try {
+ *   // User code.
+ * } finally {
+ *   idLock.releaseLockEntry(lockEntry);
+ * }
+ */ +public class IdLock { + + /** An entry returned to the client as a lock object */ + public static class Entry { + private final long id; + private int numWaiters; + private boolean isLocked = true; + + private Entry(long id) { + this.id = id; + } + + public String toString() { + return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked=" + + isLocked; + } + } + + private ConcurrentMap map = + new ConcurrentHashMap(); + + /** + * Blocks until the lock corresponding to the given id is acquired. + * + * @param id an arbitrary number to lock on + * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release + * the lock + * @throws IOException if interrupted + */ + public Entry getLockEntry(long id) throws IOException { + Entry entry = new Entry(id); + Entry existing; + while ((existing = map.putIfAbsent(entry.id, entry)) != null) { + synchronized (existing) { + if (existing.isLocked) { + ++existing.numWaiters; // Add ourselves to waiters. + while (existing.isLocked) { + try { + existing.wait(); + } catch (InterruptedException e) { + --existing.numWaiters; // Remove ourselves from waiters. + throw new InterruptedIOException( + "Interrupted waiting to acquire sparse lock"); + } + } + + --existing.numWaiters; // Remove ourselves from waiters. + existing.isLocked = true; + return existing; + } + // If the entry is not locked, it might already be deleted from the + // map, so we cannot return it. We need to get our entry into the map + // or get someone else's locked entry. + } + } + return entry; + } + + /** + * Must be called in a finally block to decrease the internal counter and + * remove the monitor object for the given id if the caller is the last + * client. + * + * @param entry the return value of {@link #getLockEntry(long)} + */ + public void releaseLockEntry(Entry entry) { + synchronized (entry) { + entry.isLocked = false; + if (entry.numWaiters > 0) { + entry.notify(); + } else { + map.remove(entry.id); + } + } + } + + /** For testing */ + void assertMapEmpty() { + assert map.size() == 0; + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java new file mode 100644 index 00000000000..5e98375dd38 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -0,0 +1,229 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumMap; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex; +import org.apache.hadoop.hbase.io.hfile.HFileReaderV2; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import static org.junit.Assert.*; + +/** + * Tests {@link HFile} cache-on-write functionality for the following block + * types: data blocks, non-root index blocks, and Bloom filter blocks. + */ +@RunWith(Parameterized.class) +public class TestCacheOnWrite { + + private static final Log LOG = LogFactory.getLog(TestCacheOnWrite.class); + + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + private Configuration conf; + private FileSystem fs; + private Random rand = new Random(12983177L); + private Path storeFilePath; + private Compression.Algorithm compress; + private CacheOnWriteType cowType; + private BlockCache blockCache; + private String testName; + + private static final int DATA_BLOCK_SIZE = 2048; + private static final int NUM_KV = 25000; + private static final int INDEX_BLOCK_SIZE = 512; + private static final int BLOOM_BLOCK_SIZE = 4096; + + /** The number of valid key types possible in a store file */ + private static final int NUM_VALID_KEY_TYPES = + KeyValue.Type.values().length - 2; + + private static enum CacheOnWriteType { + DATA_BLOCKS(BlockType.DATA, HFile.CACHE_BLOCKS_ON_WRITE_KEY), + BLOOM_BLOCKS(BlockType.BLOOM_CHUNK, + BloomFilterFactory.IO_STOREFILE_BLOOM_CACHE_ON_WRITE), + INDEX_BLOCKS(BlockType.LEAF_INDEX, + HFileBlockIndex.CACHE_INDEX_BLOCKS_ON_WRITE_KEY); + + private final String confKey; + private final BlockType inlineBlockType; + + private CacheOnWriteType(BlockType inlineBlockType, String confKey) { + this.inlineBlockType = inlineBlockType; + this.confKey = confKey; + } + + public boolean shouldBeCached(BlockType blockType) { + return blockType == inlineBlockType + || blockType == BlockType.INTERMEDIATE_INDEX + && inlineBlockType == BlockType.LEAF_INDEX; + } + + public void modifyConf(Configuration conf) { + for (CacheOnWriteType cowType : CacheOnWriteType.values()) + conf.setBoolean(cowType.confKey, cowType == this); + } + + } + + public TestCacheOnWrite(CacheOnWriteType cowType, + Compression.Algorithm compress) { + this.cowType = cowType; + this.compress = compress; + testName = "[cacheOnWrite=" + cowType + ", compress=" + compress + "]"; + } + + @Parameters + public static Collection getParameters() { + List cowTypes = new ArrayList(); + for (CacheOnWriteType cowType : CacheOnWriteType.values()) + for (Compression.Algorithm compress : + HBaseTestingUtility.COMPRESSION_ALGORITHMS) { + cowTypes.add(new Object[] { cowType, compress }); + } + return cowTypes; + } + + @Before + public void setUp() throws IOException { + conf = TEST_UTIL.getConfiguration(); + conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); + conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); + conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, + BLOOM_BLOCK_SIZE); + cowType.modifyConf(conf); + fs = FileSystem.get(conf); + blockCache = StoreFile.getBlockCache(conf); + } + + @After + public void tearDown() { + blockCache.evictBlocksByPrefix(""); + } + + @Test + public void testCacheOnWrite() throws IOException { + writeStoreFile(); + readStoreFile(); + } + + private void readStoreFile() throws IOException { + HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs, + storeFilePath, null, false, false); + LOG.info("HFile information: " + reader); + HFileScanner scanner = reader.getScanner(false, false); + assertTrue(testName, scanner.seekTo()); + + long offset = 0; + HFileBlock prevBlock = null; + EnumMap blockCountByType = + new EnumMap(BlockType.class); + + while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { + HFileBlock block = reader.readBlockData(offset, prevBlock == null ? -1 + : prevBlock.getNextBlockOnDiskSizeWithHeader(), -1, false); + String blockCacheKey = HFile.getBlockCacheKey(reader.getName(), offset); + boolean isCached = blockCache.getBlock(blockCacheKey, true) != null; + boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); + assertEquals(testName + " " + block, shouldBeCached, isCached); + prevBlock = block; + offset += block.getOnDiskSizeWithHeader(); + BlockType bt = block.getBlockType(); + Integer count = blockCountByType.get(bt); + blockCountByType.put(bt, (count == null ? 0 : count) + 1); + } + + LOG.info("Block count by type: " + blockCountByType); + assertEquals( + "{DATA=1367, LEAF_INDEX=172, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", + blockCountByType.toString()); + + reader.close(); + } + + public static KeyValue.Type generateKeyType(Random rand) { + if (rand.nextBoolean()) { + // Let's make half of KVs puts. + return KeyValue.Type.Put; + } else { + KeyValue.Type keyType = + KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; + if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) + { + throw new RuntimeException("Generated an invalid key type: " + keyType + + ". " + "Probably the layout of KeyValue.Type has changed."); + } + return keyType; + } + } + + public void writeStoreFile() throws IOException { + Path storeFileParentDir = new Path(HBaseTestingUtility.getTestDir(), + "test_cache_on_write"); + StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir, + DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf, + StoreFile.BloomType.ROWCOL, NUM_KV); + + final int rowLen = 32; + for (int i = 0; i < NUM_KV; ++i) { + byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i); + byte[] v = TestHFileWriterV2.randomValue(rand); + int cfLen = rand.nextInt(k.length - rowLen + 1); + KeyValue kv = new KeyValue( + k, 0, rowLen, + k, rowLen, cfLen, + k, rowLen + cfLen, k.length - rowLen - cfLen, + rand.nextLong(), + generateKeyType(rand), + v, 0, v.length); + sfw.append(kv); + } + + sfw.close(); + storeFilePath = sfw.getPath(); + } + +}