From d2c1d20bfc02ae69ae4e9dd1e900ebbb599aa109 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Wed, 3 Aug 2011 20:30:27 +0000 Subject: [PATCH] HBASE-3857 New test classes. git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1153647 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/io/hfile/TestFixedFileTrailer.java | 227 +++++++ .../hadoop/hbase/io/hfile/TestHFileBlock.java | 499 +++++++++++++++ .../hbase/io/hfile/TestHFileBlockIndex.java | 602 ++++++++++++++++++ .../hbase/io/hfile/TestHFileReaderV1.java | 89 +++ .../hbase/io/hfile/TestHFileWriterV2.java | 256 ++++++++ .../regionserver/TestCompoundBloomFilter.java | 353 ++++++++++ .../apache/hadoop/hbase/util/TestIdLock.java | 111 ++++ 7 files changed, 2137 insertions(+) create mode 100644 src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java create mode 100644 src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java create mode 100644 src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java create mode 100644 src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderV1.java create mode 100644 src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java create mode 100644 src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java create mode 100644 src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java new file mode 100644 index 00000000000..610c4e7cc75 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java @@ -0,0 +1,227 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import static org.junit.Assert.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; + +@RunWith(Parameterized.class) +public class TestFixedFileTrailer { + + private static final Log LOG = LogFactory.getLog(TestFixedFileTrailer.class); + + /** The number of used fields by version. Indexed by version minus one. */ + private static final int[] NUM_FIELDS_BY_VERSION = new int[] { 8, 13 }; + + private HBaseTestingUtility util = new HBaseTestingUtility(); + private FileSystem fs; + private ByteArrayOutputStream baos = new ByteArrayOutputStream(); + private int version; + + static { + assert NUM_FIELDS_BY_VERSION.length == HFile.MAX_FORMAT_VERSION + - HFile.MIN_FORMAT_VERSION + 1; + } + + public TestFixedFileTrailer(int version) { + this.version = version; + } + + @Parameters + public static Collection getParameters() { + List versionsToTest = new ArrayList(); + for (int v = HFile.MIN_FORMAT_VERSION; v <= HFile.MAX_FORMAT_VERSION; ++v) + versionsToTest.add(new Integer[] { v } ); + return versionsToTest; + } + + @Before + public void setUp() throws IOException { + fs = FileSystem.get(util.getConfiguration()); + } + + @Test + public void testTrailer() throws IOException { + FixedFileTrailer t = new FixedFileTrailer(version); + t.setDataIndexCount(3); + t.setEntryCount(((long) Integer.MAX_VALUE) + 1); + + if (version == 1) { + t.setFileInfoOffset(876); + } + + if (version == 2) { + t.setLastDataBlockOffset(291); + t.setNumDataIndexLevels(3); + t.setComparatorClass(KeyValue.KEY_COMPARATOR.getClass()); + t.setFirstDataBlockOffset(9081723123L); // Completely unrealistic. + t.setUncompressedDataIndexSize(827398717L); // Something random. + } + + t.setLoadOnOpenOffset(128); + t.setMetaIndexCount(7); + + t.setTotalUncompressedBytes(129731987); + + { + DataOutputStream dos = new DataOutputStream(baos); // Limited scope. + t.serialize(dos); + dos.flush(); + assertEquals(dos.size(), FixedFileTrailer.getTrailerSize(version)); + } + + byte[] bytes = baos.toByteArray(); + baos.reset(); + + assertEquals(bytes.length, FixedFileTrailer.getTrailerSize(version)); + + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + + // Finished writing, trying to read. + { + DataInputStream dis = new DataInputStream(bais); + FixedFileTrailer t2 = new FixedFileTrailer(version); + t2.deserialize(dis); + assertEquals(-1, bais.read()); // Ensure we have read everything. + checkLoadedTrailer(version, t, t2); + } + + // Now check what happens if the trailer is corrupted. + Path trailerPath = new Path(HBaseTestingUtility.getTestDir(), "trailer_" + + version); + + { + for (byte invalidVersion : new byte[] { HFile.MIN_FORMAT_VERSION - 1, + HFile.MAX_FORMAT_VERSION + 1}) { + bytes[bytes.length - 1] = invalidVersion; + writeTrailer(trailerPath, null, bytes); + try { + readTrailer(trailerPath); + fail("Exception expected"); + } catch (IOException ex) { + // Make it easy to debug this. + String msg = ex.getMessage(); + String cleanMsg = msg.replaceAll( + "^(java(\\.[a-zA-Z]+)+:\\s+)?|\\s+\\(.*\\)\\s*$", ""); + assertEquals("Actual exception message is \"" + msg + "\".\n" + + "Cleaned-up message", // will be followed by " expected: ..." + "Invalid HFile version: " + invalidVersion, cleanMsg); + LOG.info("Got an expected exception: " + msg); + } + } + + } + + // Now write the trailer into a file and auto-detect the version. + writeTrailer(trailerPath, t, null); + + FixedFileTrailer t4 = readTrailer(trailerPath); + + checkLoadedTrailer(version, t, t4); + + String trailerStr = t.toString(); + assertEquals("Invalid number of fields in the string representation " + + "of the trailer: " + trailerStr, NUM_FIELDS_BY_VERSION[version - 1], + trailerStr.split(", ").length); + assertEquals(trailerStr, t4.toString()); + } + + private FixedFileTrailer readTrailer(Path trailerPath) throws IOException { + FSDataInputStream fsdis = fs.open(trailerPath); + FixedFileTrailer trailerRead = FixedFileTrailer.readFromStream(fsdis, + fs.getFileStatus(trailerPath).getLen()); + fsdis.close(); + return trailerRead; + } + + private void writeTrailer(Path trailerPath, FixedFileTrailer t, + byte[] useBytesInstead) throws IOException { + assert (t == null) != (useBytesInstead == null); // Expect one non-null. + + FSDataOutputStream fsdos = fs.create(trailerPath); + fsdos.write(135); // to make deserializer's job less trivial + if (useBytesInstead != null) { + fsdos.write(useBytesInstead); + } else { + t.serialize(fsdos); + } + fsdos.close(); + } + + private void checkLoadedTrailer(int version, FixedFileTrailer expected, + FixedFileTrailer loaded) throws IOException { + assertEquals(version, loaded.getVersion()); + assertEquals(expected.getDataIndexCount(), loaded.getDataIndexCount()); + + assertEquals(Math.min(expected.getEntryCount(), + version == 1 ? Integer.MAX_VALUE : Long.MAX_VALUE), + loaded.getEntryCount()); + + if (version == 1) { + assertEquals(expected.getFileInfoOffset(), loaded.getFileInfoOffset()); + } + + if (version == 2) { + assertEquals(expected.getLastDataBlockOffset(), + loaded.getLastDataBlockOffset()); + assertEquals(expected.getNumDataIndexLevels(), + loaded.getNumDataIndexLevels()); + assertEquals(expected.createComparator().getClass().getName(), + loaded.createComparator().getClass().getName()); + assertEquals(expected.getFirstDataBlockOffset(), + loaded.getFirstDataBlockOffset()); + assertTrue( + expected.createComparator() instanceof KeyValue.KeyComparator); + assertEquals(expected.getUncompressedDataIndexSize(), + loaded.getUncompressedDataIndexSize()); + } + + assertEquals(expected.getLoadOnOpenDataOffset(), + loaded.getLoadOnOpenDataOffset()); + assertEquals(expected.getMetaIndexCount(), loaded.getMetaIndexCount()); + + assertEquals(expected.getTotalUncompressedBytes(), + loaded.getTotalUncompressedBytes()); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java new file mode 100644 index 00000000000..ddd5ebc475c --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -0,0 +1,499 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.junit.Assert.*; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.io.compress.Compressor; + +import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*; +import org.junit.Before; +import org.junit.Test; + +public class TestHFileBlock { + + private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true }; + + private static final Log LOG = LogFactory.getLog(TestHFileBlock.class); + + static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { + NONE, GZ }; + + // In case we need to temporarily switch some test cases to just test gzip. + static final Compression.Algorithm[] GZIP_ONLY = { GZ }; + + private static final int NUM_TEST_BLOCKS = 1000; + + private static final int NUM_READER_THREADS = 26; + + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + private FileSystem fs; + private int uncompressedSizeV1; + + @Before + public void setUp() throws IOException { + fs = FileSystem.get(TEST_UTIL.getConfiguration()); + TEST_UTIL.initTestDir(); + } + + public void writeTestBlockContents(DataOutputStream dos) throws IOException { + // This compresses really well. + for (int i = 0; i < 1000; ++i) + dos.writeInt(i / 100); + } + + public byte[] createTestV1Block(Compression.Algorithm algo) + throws IOException { + Compressor compressor = algo.getCompressor(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStream os = algo.createCompressionStream(baos, compressor, 0); + DataOutputStream dos = new DataOutputStream(os); + BlockType.META.write(dos); // Let's make this a meta block. + writeTestBlockContents(dos); + uncompressedSizeV1 = dos.size(); + dos.flush(); + algo.returnCompressor(compressor); + return baos.toByteArray(); + } + + private byte[] createTestV2Block(Compression.Algorithm algo) + throws IOException { + final BlockType blockType = BlockType.DATA; + HFileBlock.Writer hbw = new HFileBlock.Writer(algo); + DataOutputStream dos = hbw.startWriting(blockType, false); + writeTestBlockContents(dos); + byte[] headerAndData = hbw.getHeaderAndData(); + assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); + hbw.releaseCompressor(); + return headerAndData; + } + + public String createTestBlockStr(Compression.Algorithm algo) + throws IOException { + byte[] testV2Block = createTestV2Block(algo); + int osOffset = HFileBlock.HEADER_SIZE + 9; + if (osOffset < testV2Block.length) { + // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid + // variations across operating systems. + // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. + testV2Block[osOffset] = 3; + } + return Bytes.toStringBinary(testV2Block); + } + + @Test + public void testNoCompression() throws IOException { + assertEquals(4000 + HFileBlock.HEADER_SIZE, createTestV2Block(NONE).length); + } + + @Test + public void testGzipCompression() throws IOException { + assertEquals( + "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF" + + "\\xFF\\xFF\\xFF\\xFF" + // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html + + "\\x1F\\x8B" // gzip magic signature + + "\\x08" // Compression method: 8 = "deflate" + + "\\x00" // Flags + + "\\x00\\x00\\x00\\x00" // mtime + + "\\x00" // XFL (extra flags) + // OS (0 = FAT filesystems, 3 = Unix). However, this field + // sometimes gets set to 0 on Linux and Mac, so we reset it to 3. + + "\\x03" + + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa" + + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" + + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00", + createTestBlockStr(GZ)); + } + + @Test + public void testReaderV1() throws IOException { + for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { + for (boolean pread : new boolean[] { false, true }) { + byte[] block = createTestV1Block(algo); + Path path = new Path(HBaseTestingUtility.getTestDir(), "blocks_v1_" + + algo); + LOG.info("Creating temporary file at " + path); + FSDataOutputStream os = fs.create(path); + int totalSize = 0; + int numBlocks = 50; + for (int i = 0; i < numBlocks; ++i) { + os.write(block); + totalSize += block.length; + } + os.close(); + + FSDataInputStream is = fs.open(path); + HFileBlock.FSReader hbr = new HFileBlock.FSReaderV1(is, algo, + totalSize); + HFileBlock b; + int numBlocksRead = 0; + long pos = 0; + while (pos < totalSize) { + b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread); + b.sanityCheck(); + pos += block.length; + numBlocksRead++; + } + assertEquals(numBlocks, numBlocksRead); + is.close(); + } + } + } + + @Test + public void testReaderV2() throws IOException { + for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { + for (boolean pread : new boolean[] { false, true }) { + Path path = new Path(HBaseTestingUtility.getTestDir(), "blocks_v2_" + + algo); + FSDataOutputStream os = fs.create(path); + HFileBlock.Writer hbw = new HFileBlock.Writer(algo); + long totalSize = 0; + for (int blockId = 0; blockId < 2; ++blockId) { + DataOutputStream dos = hbw.startWriting(BlockType.DATA, false); + for (int i = 0; i < 1234; ++i) + dos.writeInt(i); + hbw.writeHeaderAndData(os); + totalSize += hbw.getOnDiskSizeWithHeader(); + } + os.close(); + + FSDataInputStream is = fs.open(path); + HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo, + totalSize); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + is.close(); + + b.sanityCheck(); + assertEquals(4936, b.getUncompressedSizeWithoutHeader()); + assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader()); + String blockStr = b.toString(); + + if (algo == GZ) { + is = fs.open(path); + hbr = new HFileBlock.FSReaderV2(is, algo, totalSize); + b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE, -1, pread); + assertEquals(blockStr, b.toString()); + int wrongCompressedSize = 2172; + try { + b = hbr.readBlockData(0, wrongCompressedSize + + HFileBlock.HEADER_SIZE, -1, pread); + fail("Exception expected"); + } catch (IOException ex) { + String expectedPrefix = "On-disk size without header provided is " + + wrongCompressedSize + ", but block header contains " + + b.getOnDiskSizeWithoutHeader() + "."; + assertTrue("Invalid exception message: '" + ex.getMessage() + + "'.\nMessage is expected to start with: '" + expectedPrefix + + "'", ex.getMessage().startsWith(expectedPrefix)); + } + is.close(); + } + } + } + } + + @Test + public void testPreviousOffset() throws IOException { + for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { + for (boolean pread : BOOLEAN_VALUES) { + for (boolean cacheOnWrite : BOOLEAN_VALUES) { + Random rand = defaultRandom(); + LOG.info("Compression algorithm: " + algo + ", pread=" + pread); + Path path = new Path(HBaseTestingUtility.getTestDir(), "prev_offset"); + List expectedOffsets = new ArrayList(); + List expectedPrevOffsets = new ArrayList(); + List expectedTypes = new ArrayList(); + List expectedContents = cacheOnWrite + ? new ArrayList() : null; + long totalSize = writeBlocks(rand, algo, path, expectedOffsets, + expectedPrevOffsets, expectedTypes, expectedContents, true); + + FSDataInputStream is = fs.open(path); + HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo, + totalSize); + long curOffset = 0; + for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { + if (!pread) { + assertEquals(is.getPos(), curOffset + (i == 0 ? 0 : + HFileBlock.HEADER_SIZE)); + } + + assertEquals(expectedOffsets.get(i).longValue(), curOffset); + + LOG.info("Reading block #" + i + " at offset " + curOffset); + HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread); + LOG.info("Block #" + i + ": " + b); + assertEquals("Invalid block #" + i + "'s type:", + expectedTypes.get(i), b.getBlockType()); + assertEquals("Invalid previous block offset for block " + i + + " of " + "type " + b.getBlockType() + ":", + (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset()); + b.sanityCheck(); + assertEquals(curOffset, b.getOffset()); + + // Now re-load this block knowing the on-disk size. This tests a + // different branch in the loader. + HFileBlock b2 = hbr.readBlockData(curOffset, + b.getOnDiskSizeWithHeader(), -1, pread); + b2.sanityCheck(); + + assertEquals(b.getBlockType(), b2.getBlockType()); + assertEquals(b.getOnDiskSizeWithoutHeader(), + b2.getOnDiskSizeWithoutHeader()); + assertEquals(b.getOnDiskSizeWithHeader(), + b2.getOnDiskSizeWithHeader()); + assertEquals(b.getUncompressedSizeWithoutHeader(), + b2.getUncompressedSizeWithoutHeader()); + assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset()); + assertEquals(curOffset, b2.getOffset()); + + curOffset += b.getOnDiskSizeWithHeader(); + + if (cacheOnWrite) { + // In the cache-on-write mode we store uncompressed bytes so we + // can compare them to what was read by the block reader. + + ByteBuffer bufRead = b.getBufferWithHeader(); + ByteBuffer bufExpected = expectedContents.get(i); + boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(), + bufRead.arrayOffset(), bufRead.limit(), + bufExpected.array(), bufExpected.arrayOffset(), + bufExpected.limit()) == 0; + String wrongBytesMsg = ""; + + if (!bytesAreCorrect) { + // Optimization: only construct an error message in case we + // will need it. + wrongBytesMsg = "Expected bytes in block #" + i + " (algo=" + + algo + ", pread=" + pread + "):\n"; + wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), + bufExpected.arrayOffset(), Math.min(32, + bufExpected.limit())) + + ", actual:\n" + + Bytes.toStringBinary(bufRead.array(), + bufRead.arrayOffset(), Math.min(32, bufRead.limit())); + } + + assertTrue(wrongBytesMsg, bytesAreCorrect); + } + } + + assertEquals(curOffset, fs.getFileStatus(path).getLen()); + is.close(); + } + } + } + } + + private Random defaultRandom() { + return new Random(189237); + } + + private class BlockReaderThread implements Callable { + private final String clientId; + private final HFileBlock.FSReader hbr; + private final List offsets; + private final List types; + private final long fileSize; + + public BlockReaderThread(String clientId, + HFileBlock.FSReader hbr, List offsets, List types, + long fileSize) { + this.clientId = clientId; + this.offsets = offsets; + this.hbr = hbr; + this.types = types; + this.fileSize = fileSize; + } + + @Override + public Boolean call() throws Exception { + Random rand = new Random(clientId.hashCode()); + long endTime = System.currentTimeMillis() + 10000; + int numBlocksRead = 0; + int numPositionalRead = 0; + int numWithOnDiskSize = 0; + while (System.currentTimeMillis() < endTime) { + int blockId = rand.nextInt(NUM_TEST_BLOCKS); + long offset = offsets.get(blockId); + boolean pread = rand.nextBoolean(); + boolean withOnDiskSize = rand.nextBoolean(); + long expectedSize = + (blockId == NUM_TEST_BLOCKS - 1 ? fileSize + : offsets.get(blockId + 1)) - offset; + + HFileBlock b; + try { + long onDiskSizeArg = withOnDiskSize ? expectedSize : -1; + b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread); + } catch (IOException ex) { + LOG.error("Error in client " + clientId + " trying to read block at " + + offset + ", pread=" + pread + ", withOnDiskSize=" + + withOnDiskSize, ex); + return false; + } + + assertEquals(types.get(blockId), b.getBlockType()); + assertEquals(expectedSize, b.getOnDiskSizeWithHeader()); + assertEquals(offset, b.getOffset()); + + ++numBlocksRead; + if (pread) + ++numPositionalRead; + if (withOnDiskSize) + ++numWithOnDiskSize; + } + LOG.info("Client " + clientId + " successfully read " + numBlocksRead + + " blocks (with pread: " + numPositionalRead + ", with onDiskSize " + + "specified: " + numWithOnDiskSize + ")"); + return true; + } + + } + + @Test + public void testConcurrentReading() throws Exception { + for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) { + Path path = + new Path(HBaseTestingUtility.getTestDir(), "concurrent_reading"); + Random rand = defaultRandom(); + List offsets = new ArrayList(); + List types = new ArrayList(); + writeBlocks(rand, compressAlgo, path, offsets, null, types, null, false); + FSDataInputStream is = fs.open(path); + long fileSize = fs.getFileStatus(path).getLen(); + HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, compressAlgo, + fileSize); + + Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS); + ExecutorCompletionService ecs = + new ExecutorCompletionService(exec); + + for (int i = 0; i < NUM_READER_THREADS; ++i) { + ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr, + offsets, types, fileSize)); + } + + for (int i = 0; i < NUM_READER_THREADS; ++i) { + Future result = ecs.take(); + assertTrue(result.get()); + LOG.info(String.valueOf(i + 1) + + " reader threads finished successfully (algo=" + compressAlgo + + ")"); + } + + is.close(); + } + } + + private long writeBlocks(Random rand, Compression.Algorithm compressAlgo, + Path path, List expectedOffsets, List expectedPrevOffsets, + List expectedTypes, List expectedContents, + boolean detailedLogging) throws IOException { + boolean cacheOnWrite = expectedContents != null; + FSDataOutputStream os = fs.create(path); + HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo); + Map prevOffsetByType = new HashMap(); + long totalSize = 0; + for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { + int blockTypeOrdinal = rand.nextInt(BlockType.values().length); + BlockType bt = BlockType.values()[blockTypeOrdinal]; + DataOutputStream dos = hbw.startWriting(bt, cacheOnWrite); + for (int j = 0; j < rand.nextInt(500); ++j) { + // This might compress well. + dos.writeShort(i + 1); + dos.writeInt(j + 1); + } + + if (expectedOffsets != null) + expectedOffsets.add(os.getPos()); + + if (expectedPrevOffsets != null) { + Long prevOffset = prevOffsetByType.get(bt); + expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1); + prevOffsetByType.put(bt, os.getPos()); + } + + expectedTypes.add(bt); + + hbw.writeHeaderAndData(os); + totalSize += hbw.getOnDiskSizeWithHeader(); + + if (cacheOnWrite) + expectedContents.add(hbw.getUncompressedBufferWithHeader()); + + if (detailedLogging) { + LOG.info("Writing block #" + i + " of type " + bt + + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader() + + " at offset " + os.getPos()); + } + } + os.close(); + LOG.info("Created a temporary file at " + path + ", " + + fs.getFileStatus(path).getLen() + " byte, compression=" + + compressAlgo); + return totalSize; + } + + @Test + public void testBlockHeapSize() { + for (int size : new int[] { 100, 256, 12345 }) { + byte[] byteArr = new byte[HFileBlock.HEADER_SIZE + size]; + ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); + HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, + true, -1); + assertEquals(80, HFileBlock.BYTE_BUFFER_HEAP_SIZE); + long expected = ClassSize.align(ClassSize.estimateBase(HFileBlock.class, + true) + + ClassSize.estimateBase(buf.getClass(), true) + + HFileBlock.HEADER_SIZE + size); + assertEquals(expected, block.heapSize()); + } + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java new file mode 100644 index 00000000000..4dc1367d520 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -0,0 +1,602 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader; +import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexChunk; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class TestHFileBlockIndex { + + @Parameters + public static Collection compressionAlgorithms() { + return HBaseTestingUtility.COMPRESSION_ALGORITHMS_PARAMETERIZED; + } + + public TestHFileBlockIndex(Compression.Algorithm compr) { + this.compr = compr; + } + + private static final Log LOG = LogFactory.getLog(TestHFileBlockIndex.class); + + private static final int NUM_DATA_BLOCKS = 1000; + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private static final int SMALL_BLOCK_SIZE = 4096; + private static final int NUM_KV = 10000; + + private static FileSystem fs; + private Path path; + private Random rand; + private long rootIndexOffset; + private int numRootEntries; + private int numLevels; + private static final List keys = new ArrayList(); + private final Compression.Algorithm compr; + private byte[] firstKeyInFile; + private Configuration conf; + + private static final int[] INDEX_CHUNK_SIZES = { 4096, 512, 384 }; + private static final int[] EXPECTED_NUM_LEVELS = { 2, 3, 4 }; + private static final int[] UNCOMPRESSED_INDEX_SIZES = + { 19187, 21813, 23086 }; + + static { + assert INDEX_CHUNK_SIZES.length == EXPECTED_NUM_LEVELS.length; + assert INDEX_CHUNK_SIZES.length == UNCOMPRESSED_INDEX_SIZES.length; + } + + @Before + public void setUp() throws IOException { + keys.clear(); + rand = new Random(2389757); + firstKeyInFile = null; + conf = TEST_UTIL.getConfiguration(); + + // This test requires at least HFile format version 2. + conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); + + fs = FileSystem.get(conf); + } + + @Test + public void testBlockIndex() throws IOException { + path = new Path(HBaseTestingUtility.getTestDir(), "block_index_" + compr); + writeWholeIndex(); + readIndex(); + } + + /** + * A wrapper around a block reader which only caches the results of the last + * operation. Not thread-safe. + */ + private static class BlockReaderWrapper implements HFileBlock.BasicReader { + + private HFileBlock.BasicReader realReader; + private long prevOffset; + private long prevOnDiskSize; + private long prevUncompressedSize; + private boolean prevPread; + private HFileBlock prevBlock; + + public int hitCount = 0; + public int missCount = 0; + + public BlockReaderWrapper(HFileBlock.BasicReader realReader) { + this.realReader = realReader; + } + + @Override + public HFileBlock readBlockData(long offset, long onDiskSize, + int uncompressedSize, boolean pread) throws IOException { + if (offset == prevOffset && onDiskSize == prevOnDiskSize && + uncompressedSize == prevUncompressedSize && pread == prevPread) { + hitCount += 1; + return prevBlock; + } + + missCount += 1; + prevBlock = realReader.readBlockData(offset, onDiskSize, + uncompressedSize, pread); + prevOffset = offset; + prevOnDiskSize = onDiskSize; + prevUncompressedSize = uncompressedSize; + prevPread = pread; + + return prevBlock; + } + } + + public void readIndex() throws IOException { + long fileSize = fs.getFileStatus(path).getLen(); + LOG.info("Size of " + path + ": " + fileSize); + + FSDataInputStream istream = fs.open(path); + HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(istream, + compr, fs.getFileStatus(path).getLen()); + + BlockReaderWrapper brw = new BlockReaderWrapper(blockReader); + HFileBlockIndex.BlockIndexReader indexReader = + new HFileBlockIndex.BlockIndexReader( + Bytes.BYTES_RAWCOMPARATOR, numLevels, brw); + + indexReader.readRootIndex(blockReader.blockRange(rootIndexOffset, + fileSize).nextBlockAsStream(BlockType.ROOT_INDEX), numRootEntries); + + long prevOffset = -1; + int i = 0; + int expectedHitCount = 0; + int expectedMissCount = 0; + LOG.info("Total number of keys: " + keys.size()); + for (byte[] key : keys) { + assertTrue(key != null); + assertTrue(indexReader != null); + HFileBlock b = indexReader.seekToDataBlock(key, 0, key.length, null); + if (Bytes.BYTES_RAWCOMPARATOR.compare(key, firstKeyInFile) < 0) { + assertTrue(b == null); + ++i; + continue; + } + + String keyStr = "key #" + i + ", " + Bytes.toStringBinary(key); + + assertTrue("seekToDataBlock failed for " + keyStr, b != null); + + if (prevOffset == b.getOffset()) { + assertEquals(++expectedHitCount, brw.hitCount); + } else { + LOG.info("First key in a new block: " + keyStr + ", block offset: " + + b.getOffset() + ")"); + assertTrue(b.getOffset() > prevOffset); + assertEquals(++expectedMissCount, brw.missCount); + prevOffset = b.getOffset(); + } + ++i; + } + + istream.close(); + } + + private void writeWholeIndex() throws IOException { + assertEquals(0, keys.size()); + HFileBlock.Writer hbw = new HFileBlock.Writer(compr); + FSDataOutputStream outputStream = fs.create(path); + HFileBlockIndex.BlockIndexWriter biw = + new HFileBlockIndex.BlockIndexWriter(hbw, null, null); + + for (int i = 0; i < NUM_DATA_BLOCKS; ++i) { + hbw.startWriting(BlockType.DATA, false).write( + String.valueOf(rand.nextInt(1000)).getBytes()); + long blockOffset = outputStream.getPos(); + hbw.writeHeaderAndData(outputStream); + + byte[] firstKey = null; + for (int j = 0; j < 16; ++j) { + byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i * 16 + j); + keys.add(k); + if (j == 8) + firstKey = k; + } + assertTrue(firstKey != null); + if (firstKeyInFile == null) + firstKeyInFile = firstKey; + biw.addEntry(firstKey, blockOffset, hbw.getOnDiskSizeWithHeader()); + + writeInlineBlocks(hbw, outputStream, biw, false); + } + writeInlineBlocks(hbw, outputStream, biw, true); + rootIndexOffset = biw.writeIndexBlocks(outputStream); + outputStream.close(); + + numLevels = biw.getNumLevels(); + numRootEntries = biw.getNumRootEntries(); + + LOG.info("Index written: numLevels=" + numLevels + ", numRootEntries=" + + numRootEntries + ", rootIndexOffset=" + rootIndexOffset); + } + + private void writeInlineBlocks(HFileBlock.Writer hbw, + FSDataOutputStream outputStream, HFileBlockIndex.BlockIndexWriter biw, + boolean isClosing) throws IOException { + while (biw.shouldWriteBlock(isClosing)) { + long offset = outputStream.getPos(); + biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType(), false)); + hbw.writeHeaderAndData(outputStream); + biw.blockWritten(offset, hbw.getOnDiskSizeWithHeader(), + hbw.getUncompressedSizeWithoutHeader()); + LOG.info("Wrote an inline index block at " + offset + ", size " + + hbw.getOnDiskSizeWithHeader()); + } + } + + private static final long getDummyFileOffset(int i) { + return i * 185 + 379; + } + + private static final int getDummyOnDiskSize(int i) { + return i * i * 37 + i * 19 + 13; + } + + @Test + public void testSecondaryIndexBinarySearch() throws IOException { + int numTotalKeys = 99; + assertTrue(numTotalKeys % 2 == 1); // Ensure no one made this even. + + // We only add odd-index keys into the array that we will binary-search. + int numSearchedKeys = (numTotalKeys - 1) / 2; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + + dos.writeInt(numSearchedKeys); + int curAllEntriesSize = 0; + int numEntriesAdded = 0; + + // Only odd-index elements of this array are used to keep the secondary + // index entries of the corresponding keys. + int secondaryIndexEntries[] = new int[numTotalKeys]; + + for (int i = 0; i < numTotalKeys; ++i) { + byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i * 2); + keys.add(k); + String msgPrefix = "Key #" + i + " (" + Bytes.toStringBinary(k) + "): "; + StringBuilder padding = new StringBuilder(); + while (msgPrefix.length() + padding.length() < 70) + padding.append(' '); + msgPrefix += padding; + if (i % 2 == 1) { + dos.writeInt(curAllEntriesSize); + secondaryIndexEntries[i] = curAllEntriesSize; + LOG.info(msgPrefix + "secondary index entry #" + ((i - 1) / 2) + + ", offset " + curAllEntriesSize); + curAllEntriesSize += k.length + + HFileBlockIndex.SECONDARY_INDEX_ENTRY_OVERHEAD; + ++numEntriesAdded; + } else { + secondaryIndexEntries[i] = -1; + LOG.info(msgPrefix + "not in the searched array"); + } + } + + // Make sure the keys are increasing. + for (int i = 0; i < keys.size() - 1; ++i) + assertTrue(Bytes.BYTES_RAWCOMPARATOR.compare(keys.get(i), + keys.get(i + 1)) < 0); + + dos.writeInt(curAllEntriesSize); + assertEquals(numSearchedKeys, numEntriesAdded); + int secondaryIndexOffset = dos.size(); + assertEquals(Bytes.SIZEOF_INT * (numSearchedKeys + 2), + secondaryIndexOffset); + + for (int i = 1; i <= numTotalKeys - 1; i += 2) { + assertEquals(dos.size(), + secondaryIndexOffset + secondaryIndexEntries[i]); + long dummyFileOffset = getDummyFileOffset(i); + int dummyOnDiskSize = getDummyOnDiskSize(i); + LOG.debug("Storing file offset=" + dummyFileOffset + " and onDiskSize=" + + dummyOnDiskSize + " at offset " + dos.size()); + dos.writeLong(dummyFileOffset); + dos.writeInt(dummyOnDiskSize); + LOG.debug("Stored key " + ((i - 1) / 2) +" at offset " + dos.size()); + dos.write(keys.get(i)); + } + + dos.writeInt(curAllEntriesSize); + + ByteBuffer nonRootIndex = ByteBuffer.wrap(baos.toByteArray()); + for (int i = 0; i < numTotalKeys; ++i) { + byte[] searchKey = keys.get(i); + byte[] arrayHoldingKey = new byte[searchKey.length + + searchKey.length / 2]; + + // To make things a bit more interesting, store the key we are looking + // for at a non-zero offset in a new array. + System.arraycopy(searchKey, 0, arrayHoldingKey, searchKey.length / 2, + searchKey.length); + + int searchResult = BlockIndexReader.binarySearchNonRootIndex( + arrayHoldingKey, searchKey.length / 2, searchKey.length, nonRootIndex, + Bytes.BYTES_RAWCOMPARATOR); + String lookupFailureMsg = "Failed to look up key #" + i + " (" + + Bytes.toStringBinary(searchKey) + ")"; + + int expectedResult; + int referenceItem; + + if (i % 2 == 1) { + // This key is in the array we search as the element (i - 1) / 2. Make + // sure we find it. + expectedResult = (i - 1) / 2; + referenceItem = i; + } else { + // This key is not in the array but between two elements on the array, + // in the beginning, or in the end. The result should be the previous + // key in the searched array, or -1 for i = 0. + expectedResult = i / 2 - 1; + referenceItem = i - 1; + } + + assertEquals(lookupFailureMsg, expectedResult, searchResult); + + // Now test we can get the offset and the on-disk-size using a + // higher-level API function.s + boolean locateBlockResult = + BlockIndexReader.locateNonRootIndexEntry(nonRootIndex, arrayHoldingKey, + searchKey.length / 2, searchKey.length, Bytes.BYTES_RAWCOMPARATOR); + + if (i == 0) { + assertFalse(locateBlockResult); + } else { + assertTrue(locateBlockResult); + String errorMsg = "i=" + i + ", position=" + nonRootIndex.position(); + assertEquals(errorMsg, getDummyFileOffset(referenceItem), + nonRootIndex.getLong()); + assertEquals(errorMsg, getDummyOnDiskSize(referenceItem), + nonRootIndex.getInt()); + } + } + + } + + @Test + public void testBlockIndexChunk() throws IOException { + BlockIndexChunk c = new BlockIndexChunk(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + int N = 1000; + int[] numSubEntriesAt = new int[N]; + int numSubEntries = 0; + for (int i = 0; i < N; ++i) { + baos.reset(); + DataOutputStream dos = new DataOutputStream(baos); + c.writeNonRoot(dos); + assertEquals(c.getNonRootSize(), dos.size()); + + baos.reset(); + dos = new DataOutputStream(baos); + c.writeRoot(dos); + assertEquals(c.getRootSize(), dos.size()); + + byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i); + numSubEntries += rand.nextInt(5) + 1; + keys.add(k); + c.add(k, getDummyFileOffset(i), getDummyOnDiskSize(i), numSubEntries); + } + + // Test the ability to look up the entry that contains a particular + // deeper-level index block's entry ("sub-entry"), assuming a global + // 0-based ordering of sub-entries. This is needed for mid-key calculation. + for (int i = 0; i < N; ++i) { + for (int j = i == 0 ? 0 : numSubEntriesAt[i - 1]; + j < numSubEntriesAt[i]; + ++j) { + assertEquals(i, c.getEntryBySubEntry(j)); + } + } + } + + /** Checks if the HeapSize calculator is within reason */ + @Test + public void testHeapSizeForBlockIndex() throws IOException { + Class cl = + HFileBlockIndex.BlockIndexReader.class; + long expected = ClassSize.estimateBase(cl, false); + + HFileBlockIndex.BlockIndexReader bi = + new HFileBlockIndex.BlockIndexReader(Bytes.BYTES_RAWCOMPARATOR, 1); + long actual = bi.heapSize(); + + // Since the arrays in BlockIndex(byte [][] blockKeys, long [] blockOffsets, + // int [] blockDataSizes) are all null they are not going to show up in the + // HeapSize calculation, so need to remove those array costs from expected. + expected -= ClassSize.align(3 * ClassSize.ARRAY); + + if (expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + } + + /** + * Testing block index through the HFile writer/reader APIs. Allows to test + * setting index block size through configuration, intermediate-level index + * blocks, and caching index blocks on write. + * + * @throws IOException + */ + @Test + public void testHFileWriterAndReader() throws IOException { + Path hfilePath = new Path(HBaseTestingUtility.getTestDir(), + "hfile_for_block_index"); + BlockCache blockCache = StoreFile.getBlockCache(conf); + + for (int testI = 0; testI < INDEX_CHUNK_SIZES.length; ++testI) { + int indexBlockSize = INDEX_CHUNK_SIZES[testI]; + int expectedNumLevels = EXPECTED_NUM_LEVELS[testI]; + LOG.info("Index block size: " + indexBlockSize + ", compression: " + + compr); + // Evict all blocks that were cached-on-write by the previous invocation. + blockCache.evictBlocksByPrefix(hfilePath.getName() + + HFile.CACHE_KEY_SEPARATOR); + + conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, indexBlockSize); + Set keyStrSet = new HashSet(); + byte[][] keys = new byte[NUM_KV][]; + byte[][] values = new byte[NUM_KV][]; + + // Write the HFile + { + HFile.Writer writer = HFile.getWriterFactory(conf).createWriter(fs, + hfilePath, SMALL_BLOCK_SIZE, compr, KeyValue.KEY_COMPARATOR); + Random rand = new Random(19231737); + + for (int i = 0; i < NUM_KV; ++i) { + byte[] row = TestHFileWriterV2.randomOrderedKey(rand, i); + + // Key will be interpreted by KeyValue.KEY_COMPARATOR + byte[] k = KeyValue.createFirstOnRow(row, 0, row.length, row, 0, 0, + row, 0, 0).getKey(); + + byte[] v = TestHFileWriterV2.randomValue(rand); + writer.append(k, v); + keys[i] = k; + values[i] = v; + keyStrSet.add(Bytes.toStringBinary(k)); + + if (i > 0) { + assertTrue(KeyValue.KEY_COMPARATOR.compare(keys[i - 1], + keys[i]) < 0); + } + } + + writer.close(); + } + + // Read the HFile + HFile.Reader reader = HFile.createReader(fs, hfilePath, blockCache, + false, true); + assertEquals(expectedNumLevels, + reader.getTrailer().getNumDataIndexLevels()); + + assertTrue(Bytes.equals(keys[0], reader.getFirstKey())); + assertTrue(Bytes.equals(keys[NUM_KV - 1], reader.getLastKey())); + LOG.info("Last key: " + Bytes.toStringBinary(keys[NUM_KV - 1])); + + for (boolean pread : new boolean[] { false, true }) { + HFileScanner scanner = reader.getScanner(true, pread); + for (int i = 0; i < NUM_KV; ++i) { + checkSeekTo(keys, scanner, i); + checkKeyValue("i=" + i, keys[i], values[i], scanner.getKey(), + scanner.getValue()); + } + assertTrue(scanner.seekTo()); + for (int i = NUM_KV - 1; i >= 0; --i) { + checkSeekTo(keys, scanner, i); + checkKeyValue("i=" + i, keys[i], values[i], scanner.getKey(), + scanner.getValue()); + } + } + + // Manually compute the mid-key and validate it. + HFileReaderV2 reader2 = (HFileReaderV2) reader; + HFileBlock.FSReader fsReader = reader2.getUncachedBlockReader(); + + HFileBlock.BlockIterator iter = fsReader.blockRange(0, + reader.getTrailer().getLoadOnOpenDataOffset()); + HFileBlock block; + List blockKeys = new ArrayList(); + while ((block = iter.nextBlock()) != null) { + if (block.getBlockType() != BlockType.LEAF_INDEX) + return; + ByteBuffer b = block.getBufferReadOnly(); + int n = b.getInt(); + // One int for the number of items, and n + 1 for the secondary index. + int entriesOffset = Bytes.SIZEOF_INT * (n + 2); + + // Get all the keys from the leaf index block. S + for (int i = 0; i < n; ++i) { + int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (i + 1)); + int nextKeyRelOffset = b.getInt(Bytes.SIZEOF_INT * (i + 2)); + int keyLen = nextKeyRelOffset - keyRelOffset; + int keyOffset = b.arrayOffset() + entriesOffset + keyRelOffset + + HFileBlockIndex.SECONDARY_INDEX_ENTRY_OVERHEAD; + byte[] blockKey = Arrays.copyOfRange(b.array(), keyOffset, keyOffset + + keyLen); + String blockKeyStr = Bytes.toString(blockKey); + blockKeys.add(blockKey); + + // If the first key of the block is not among the keys written, we + // are not parsing the non-root index block format correctly. + assertTrue("Invalid block key from leaf-level block: " + blockKeyStr, + keyStrSet.contains(blockKeyStr)); + } + } + + // Validate the mid-key. + assertEquals( + Bytes.toStringBinary(blockKeys.get((blockKeys.size() - 1) / 2)), + Bytes.toStringBinary(reader.midkey())); + + assertEquals(UNCOMPRESSED_INDEX_SIZES[testI], + reader.getTrailer().getUncompressedDataIndexSize()); + + reader.close(); + } + } + + private void checkSeekTo(byte[][] keys, HFileScanner scanner, int i) + throws IOException { + assertEquals("Failed to seek to key #" + i + " (" + + Bytes.toStringBinary(keys[i]) + ")", 0, scanner.seekTo(keys[i])); + } + + private void assertArrayEqualsBuffer(String msgPrefix, byte[] arr, + ByteBuffer buf) { + assertEquals(msgPrefix + ": expected " + Bytes.toStringBinary(arr) + + ", actual " + Bytes.toStringBinary(buf), 0, Bytes.compareTo(arr, 0, + arr.length, buf.array(), buf.arrayOffset(), buf.limit())); + } + + /** Check a key/value pair after it was read by the reader */ + private void checkKeyValue(String msgPrefix, byte[] expectedKey, + byte[] expectedValue, ByteBuffer keyRead, ByteBuffer valueRead) { + if (!msgPrefix.isEmpty()) + msgPrefix += ". "; + + assertArrayEqualsBuffer(msgPrefix + "Invalid key", expectedKey, keyRead); + assertArrayEqualsBuffer(msgPrefix + "Invalid value", expectedValue, + valueRead); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderV1.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderV1.java new file mode 100644 index 00000000000..249ddb0a169 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderV1.java @@ -0,0 +1,89 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile; + +import java.io.IOException; +import java.net.URL; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.util.Bytes; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestHFileReaderV1 { + + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private Configuration conf; + private FileSystem fs; + + private static final int N = 1000; + + @Before + public void setUp() throws IOException { + conf = TEST_UTIL.getConfiguration(); + fs = FileSystem.get(conf); + } + + @Test + public void testReadingExistingVersion1HFile() throws IOException { + URL url = TestHFileReaderV1.class.getResource( + "8e8ab58dcf39412da19833fcd8f687ac"); + Path existingHFilePath = new Path(url.getPath()); + HFile.Reader reader = + HFile.createReader(fs, existingHFilePath, null, false, false); + reader.loadFileInfo(); + FixedFileTrailer trailer = reader.getTrailer(); + + assertEquals(N, reader.getEntries()); + assertEquals(N, trailer.getEntryCount()); + assertEquals(1, trailer.getVersion()); + assertEquals(Compression.Algorithm.GZ, trailer.getCompressionCodec()); + + for (boolean pread : new boolean[] { false, true }) { + int totalDataSize = 0; + int n = 0; + + HFileScanner scanner = reader.getScanner(false, pread); + assertTrue(scanner.seekTo()); + do { + totalDataSize += scanner.getKey().limit() + scanner.getValue().limit() + + Bytes.SIZEOF_INT * 2; + ++n; + } while (scanner.next()); + + // Add magic record sizes, one per data block. + totalDataSize += 8 * trailer.getDataIndexCount(); + + assertEquals(N, n); + assertEquals(trailer.getTotalUncompressedBytes(), totalDataSize); + } + reader.close(); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java new file mode 100644 index 00000000000..6a3db20c52e --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -0,0 +1,256 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Test; + +/** + * Testing writing a version 2 {@link HFile}. This is a low-level test written + * during the development of {@link HFileWriterV2}. + */ +public class TestHFileWriterV2 { + + private static final Log LOG = LogFactory.getLog(TestHFileWriterV2.class); + + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private Configuration conf; + private FileSystem fs; + + @Before + public void setUp() throws IOException { + conf = TEST_UTIL.getConfiguration(); + fs = FileSystem.get(conf); + } + + @Test + public void testHFileFormatV2() throws IOException { + Path hfilePath = new Path(HBaseTestingUtility.getTestDir(), + "testHFileFormatV2"); + + final Compression.Algorithm COMPRESS_ALGO = Compression.Algorithm.GZ; + HFileWriterV2 writer = new HFileWriterV2(conf, fs, hfilePath, 4096, + COMPRESS_ALGO, KeyValue.KEY_COMPARATOR); + + long totalKeyLength = 0; + long totalValueLength = 0; + + Random rand = new Random(9713312); // Just a fixed seed. + + final int ENTRY_COUNT = 10000; + List keys = new ArrayList(); + List values = new ArrayList(); + + for (int i = 0; i < ENTRY_COUNT; ++i) { + byte[] keyBytes = randomOrderedKey(rand, i); + + // A random-length random value. + byte[] valueBytes = randomValue(rand); + writer.append(keyBytes, valueBytes); + + totalKeyLength += keyBytes.length; + totalValueLength += valueBytes.length; + + keys.add(keyBytes); + values.add(valueBytes); + } + + // Add in an arbitrary order. They will be sorted lexicographically by + // the key. + writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C.")); + writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow")); + writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris")); + + writer.close(); + + FSDataInputStream fsdis = fs.open(hfilePath); + + // A "manual" version of a new-format HFile reader. This unit test was + // written before the V2 reader was fully implemented. + + long fileSize = fs.getFileStatus(hfilePath).getLen(); + FixedFileTrailer trailer = + FixedFileTrailer.readFromStream(fsdis, fileSize); + + assertEquals(2, trailer.getVersion()); + assertEquals(ENTRY_COUNT, trailer.getEntryCount()); + + HFileBlock.FSReader blockReader = + new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize); + + // Counters for the number of key/value pairs and the number of blocks + int entriesRead = 0; + int blocksRead = 0; + + // Scan blocks the way the reader would scan them + fsdis.seek(0); + long curBlockPos = 0; + while (curBlockPos <= trailer.getLastDataBlockOffset()) { + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + assertEquals(BlockType.DATA, block.getBlockType()); + ByteBuffer buf = block.getBufferWithoutHeader(); + while (buf.hasRemaining()) { + int keyLen = buf.getInt(); + int valueLen = buf.getInt(); + + byte[] key = new byte[keyLen]; + buf.get(key); + + byte[] value = new byte[valueLen]; + buf.get(value); + + // A brute-force check to see that all keys and values are correct. + assertTrue(Bytes.compareTo(key, keys.get(entriesRead)) == 0); + assertTrue(Bytes.compareTo(value, values.get(entriesRead)) == 0); + + ++entriesRead; + } + ++blocksRead; + curBlockPos += block.getOnDiskSizeWithHeader(); + } + LOG.info("Finished reading: entries=" + entriesRead + ", blocksRead=" + + blocksRead); + assertEquals(ENTRY_COUNT, entriesRead); + + // Meta blocks. We can scan until the load-on-open data offset (which is + // the root block index offset in version 2) because we are not testing + // intermediate-level index blocks here. + + int metaCounter = 0; + while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { + LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + + trailer.getLoadOnOpenDataOffset()); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + assertEquals(BlockType.META, block.getBlockType()); + Text t = new Text(); + block.readInto(t); + Text expectedText = + (metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text( + "Moscow") : new Text("Washington, D.C.")); + assertEquals(expectedText, t); + LOG.info("Read meta block data: " + t); + ++metaCounter; + curBlockPos += block.getOnDiskSizeWithHeader(); + } + + fsdis.close(); + } + + // Static stuff used by various HFile v2 unit tests + + private static final String COLUMN_FAMILY_NAME = "_-myColumnFamily-_"; + private static final int MIN_ROW_OR_QUALIFIER_LENGTH = 64; + private static final int MAX_ROW_OR_QUALIFIER_LENGTH = 128; + + /** + * Generates a random key that is guaranteed to increase as the given index i + * increases. The result consists of a prefix, which is a deterministic + * increasing function of i, and a random suffix. + * + * @param rand + * random number generator to use + * @param i + * @return + */ + public static byte[] randomOrderedKey(Random rand, int i) { + StringBuilder k = new StringBuilder(); + + // The fixed-length lexicographically increasing part of the key. + for (int bitIndex = 31; bitIndex >= 0; --bitIndex) { + if ((i & (1 << bitIndex)) == 0) + k.append("a"); + else + k.append("b"); + } + + // A random-length random suffix of the key. + for (int j = 0; j < rand.nextInt(50); ++j) + k.append(randomReadableChar(rand)); + + byte[] keyBytes = k.toString().getBytes(); + return keyBytes; + } + + public static byte[] randomValue(Random rand) { + StringBuilder v = new StringBuilder(); + for (int j = 0; j < 1 + rand.nextInt(2000); ++j) { + v.append((char) (32 + rand.nextInt(95))); + } + + byte[] valueBytes = v.toString().getBytes(); + return valueBytes; + } + + public static final char randomReadableChar(Random rand) { + int i = rand.nextInt(26 * 2 + 10 + 1); + if (i < 26) + return (char) ('A' + i); + i -= 26; + + if (i < 26) + return (char) ('a' + i); + i -= 26; + + if (i < 10) + return (char) ('0' + i); + i -= 10; + + assert i == 0; + return '_'; + } + + public static byte[] randomRowOrQualifier(Random rand) { + StringBuilder field = new StringBuilder(); + int fieldLen = MIN_ROW_OR_QUALIFIER_LENGTH + + rand.nextInt(MAX_ROW_OR_QUALIFIER_LENGTH + - MIN_ROW_OR_QUALIFIER_LENGTH + 1); + for (int i = 0; i < fieldLen; ++i) + field.append(randomReadableChar(rand)); + return field.toString().getBytes(); + } + + public static KeyValue randomKeyValue(Random rand) { + return new KeyValue(randomRowOrQualifier(rand), + COLUMN_FAMILY_NAME.getBytes(), randomRowOrQualifier(rand), + randomValue(rand)); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java new file mode 100644 index 00000000000..48e9163aea2 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java @@ -0,0 +1,353 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; +import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.ByteBloomFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CompoundBloomFilter; +import org.apache.hadoop.hbase.util.CompoundBloomFilterBase; +import org.apache.hadoop.hbase.util.CompoundBloomFilterWriter; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests writing Bloom filter blocks in the same part of the file as data + * blocks. + */ +public class TestCompoundBloomFilter { + + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private static final Log LOG = LogFactory.getLog( + TestCompoundBloomFilter.class); + + private static final int NUM_TESTS = 9; + private static final BloomType BLOOM_TYPES[] = { BloomType.ROW, + BloomType.ROW, BloomType.ROWCOL, BloomType.ROWCOL, BloomType.ROW, + BloomType.ROWCOL, BloomType.ROWCOL, BloomType.ROWCOL, BloomType.ROW }; + + private static final int NUM_KV[]; + static { + final int N = 10000; // Only used in initialization. + NUM_KV = new int[] { 21870, N, N, N, N, 1000, N, 7500, 7500}; + assert NUM_KV.length == NUM_TESTS; + } + + private static final int BLOCK_SIZES[]; + static { + final int blkSize = 65536; + BLOCK_SIZES = new int[] { 512, 1000, blkSize, blkSize, blkSize, 128, 300, + blkSize, blkSize }; + assert BLOCK_SIZES.length == NUM_TESTS; + } + + /** + * Be careful not to specify too high a Bloom filter block size, otherwise + * there will only be one oversized chunk and the observed false positive + * rate will be too low. + */ + private static final int BLOOM_BLOCK_SIZES[] = { 1000, 4096, 4096, 4096, + 8192, 128, 1024, 600, 600 }; + static { assert BLOOM_BLOCK_SIZES.length == NUM_TESTS; } + + private static final double TARGET_ERROR_RATES[] = { 0.025, 0.01, 0.015, + 0.01, 0.03, 0.01, 0.01, 0.07, 0.07 }; + static { assert TARGET_ERROR_RATES.length == NUM_TESTS; } + + /** A false positive rate that is obviously too high. */ + private static final double TOO_HIGH_ERROR_RATE; + static { + double m = 0; + for (double errorRate : TARGET_ERROR_RATES) + m = Math.max(m, errorRate); + TOO_HIGH_ERROR_RATE = m + 0.03; + } + + private static Configuration conf; + private FileSystem fs; + private BlockCache blockCache; + + /** A message of the form "in test#:" to include in logging. */ + private String testIdMsg; + + private static final int GENERATION_SEED = 2319; + private static final int EVALUATION_SEED = 135; + + @Before + public void setUp() throws IOException { + conf = TEST_UTIL.getConfiguration(); + + // This test requires the most recent HFile format (i.e. v2). + conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); + + fs = FileSystem.get(conf); + + blockCache = StoreFile.getBlockCache(conf); + assertNotNull(blockCache); + } + + private List createSortedKeyValues(Random rand, int n) { + List kvList = new ArrayList(n); + for (int i = 0; i < n; ++i) + kvList.add(TestHFileWriterV2.randomKeyValue(rand)); + Collections.sort(kvList, KeyValue.COMPARATOR); + return kvList; + } + + @Test + public void testCompoundBloomFilter() throws IOException { + conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true); + for (int t = 0; t < NUM_TESTS; ++t) { + conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, + (float) TARGET_ERROR_RATES[t]); + + testIdMsg = "in test #" + t + ":"; + Random generationRand = new Random(GENERATION_SEED); + List kvs = createSortedKeyValues(generationRand, NUM_KV[t]); + BloomType bt = BLOOM_TYPES[t]; + Path sfPath = writeStoreFile(t, bt, kvs); + readStoreFile(t, bt, kvs, sfPath); + } + } + + /** + * Validates the false positive ratio by computing its z-value and comparing + * it to the provided threshold. + * + * @param falsePosRate experimental positive rate + * @param nTrials the number of calls to + * {@link StoreFile.Reader#shouldSeek(Scan, java.util.SortedSet)}. + * @param zValueBoundary z-value boundary, positive for an upper bound and + * negative for a lower bound + * @param cbf the compound Bloom filter we are using + * @param additionalMsg additional message to include in log output and + * assertion failures + */ + private void validateFalsePosRate(double falsePosRate, int nTrials, + double zValueBoundary, CompoundBloomFilter cbf, String additionalMsg) { + double p = BloomFilterFactory.getErrorRate(conf); + double zValue = (falsePosRate - p) / Math.sqrt(p * (1 - p) / nTrials); + + String assortedStatsStr = " (targetErrorRate=" + p + ", falsePosRate=" + + falsePosRate + ", nTrials=" + nTrials + ")"; + LOG.info("z-value is " + zValue + assortedStatsStr); + + boolean isUpperBound = zValueBoundary > 0; + + if (isUpperBound && zValue > zValueBoundary || + !isUpperBound && zValue < zValueBoundary) { + String errorMsg = "False positive rate z-value " + zValue + " is " + + (isUpperBound ? "higher" : "lower") + " than " + zValueBoundary + + assortedStatsStr + ". Per-chunk stats:\n" + + cbf.formatTestingStats(); + fail(errorMsg + additionalMsg); + } + } + + private void readStoreFile(int t, BloomType bt, List kvs, + Path sfPath) throws IOException { + StoreFile sf = new StoreFile(fs, sfPath, true, conf, bt, false); + StoreFile.Reader r = sf.createReader(); + final boolean pread = true; // does not really matter + StoreFileScanner scanner = r.getStoreFileScanner(true, pread); + + { + // Test for false negatives (not allowed). + int numChecked = 0; + for (KeyValue kv : kvs) { + byte[] row = kv.getRow(); + boolean present = isInBloom(scanner, row, kv.getQualifier()); + assertTrue(testIdMsg + " Bloom filter false negative on row " + + Bytes.toStringBinary(row) + " after " + numChecked + + " successful checks", present); + ++numChecked; + } + } + + // Test for false positives (some percentage allowed). We test in two modes: + // "fake lookup" which ignores the key distribution, and production mode. + for (boolean fakeLookupEnabled : new boolean[] { true, false }) { + ByteBloomFilter.setFakeLookupMode(fakeLookupEnabled); + try { + String fakeLookupModeStr = ", fake lookup is " + (fakeLookupEnabled ? + "enabled" : "disabled"); + CompoundBloomFilter cbf = (CompoundBloomFilter) r.getBloomFilter(); + cbf.enableTestingStats(); + int numFalsePos = 0; + Random rand = new Random(EVALUATION_SEED); + int nTrials = NUM_KV[t] * 10; + for (int i = 0; i < nTrials; ++i) { + byte[] query = TestHFileWriterV2.randomRowOrQualifier(rand); + if (isInBloom(scanner, query, bt, rand)) { + numFalsePos += 1; + } + } + double falsePosRate = numFalsePos * 1.0 / nTrials; + LOG.debug(String.format(testIdMsg + + " False positives: %d out of %d (%f)", + numFalsePos, nTrials, falsePosRate) + fakeLookupModeStr); + + // Check for obvious Bloom filter crashes. + assertTrue("False positive is too high: " + falsePosRate + " (greater " + + "than " + TOO_HIGH_ERROR_RATE + ")" + fakeLookupModeStr, + falsePosRate < TOO_HIGH_ERROR_RATE); + + // Now a more precise check to see if the false positive rate is not + // too high. The reason we use a relaxed restriction for the real-world + // case as opposed to the "fake lookup" case is that our hash functions + // are not completely independent. + + double maxZValue = fakeLookupEnabled ? 1.96 : 2.5; + validateFalsePosRate(falsePosRate, nTrials, maxZValue, cbf, + fakeLookupModeStr); + + // For checking the lower bound we need to eliminate the last chunk, + // because it is frequently smaller and the false positive rate in it + // is too low. This does not help if there is only one under-sized + // chunk, though. + int nChunks = cbf.getNumChunks(); + if (nChunks > 1) { + numFalsePos -= cbf.getNumPositivesForTesting(nChunks - 1); + nTrials -= cbf.getNumQueriesForTesting(nChunks - 1); + falsePosRate = numFalsePos * 1.0 / nTrials; + LOG.info(testIdMsg + " False positive rate without last chunk is " + + falsePosRate + fakeLookupModeStr); + } + + validateFalsePosRate(falsePosRate, nTrials, -2.58, cbf, + fakeLookupModeStr); + } finally { + ByteBloomFilter.setFakeLookupMode(false); + } + } + + r.close(); + } + + private boolean isInBloom(StoreFileScanner scanner, byte[] row, BloomType bt, + Random rand) { + return isInBloom(scanner, row, + TestHFileWriterV2.randomRowOrQualifier(rand)); + } + + private boolean isInBloom(StoreFileScanner scanner, byte[] row, + byte[] qualifier) { + Scan scan = new Scan(row, row); + TreeSet columns = new TreeSet(Bytes.BYTES_COMPARATOR); + columns.add(qualifier); + return scanner.shouldSeek(scan, columns); + } + + private Path writeStoreFile(int t, BloomType bt, List kvs) + throws IOException { + conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, + BLOOM_BLOCK_SIZES[t]); + conf.setBoolean(HFile.CACHE_BLOCKS_ON_WRITE_KEY, true); + + StoreFile.Writer w = StoreFile.createWriter(fs, + HBaseTestingUtility.getTestDir(), BLOCK_SIZES[t], null, null, conf, + bt, 0); + + assertTrue(w.hasBloom()); + assertTrue(w.getBloomWriter() instanceof CompoundBloomFilterWriter); + CompoundBloomFilterWriter cbbf = + (CompoundBloomFilterWriter) w.getBloomWriter(); + + int keyCount = 0; + KeyValue prev = null; + LOG.debug("Total keys/values to insert: " + kvs.size()); + for (KeyValue kv : kvs) { + w.append(kv); + + // Validate the key count in the Bloom filter. + boolean newKey = true; + if (prev != null) { + newKey = !(bt == BloomType.ROW ? KeyValue.COMPARATOR.matchingRows(kv, + prev) : KeyValue.COMPARATOR.matchingRowColumn(kv, prev)); + } + if (newKey) + ++keyCount; + assertEquals(keyCount, cbbf.getKeyCount()); + + prev = kv; + } + w.close(); + + return w.getPath(); + } + + @Test + public void testCompoundBloomSizing() { + int bloomBlockByteSize = 4096; + int bloomBlockBitSize = bloomBlockByteSize * 8; + double targetErrorRate = 0.01; + long maxKeysPerChunk = ByteBloomFilter.idealMaxKeys(bloomBlockBitSize, + targetErrorRate); + + long bloomSize1 = bloomBlockByteSize * 8; + long bloomSize2 = ByteBloomFilter.computeBitSize(maxKeysPerChunk, + targetErrorRate); + + double bloomSizeRatio = (bloomSize2 * 1.0 / bloomSize1); + assertTrue(Math.abs(bloomSizeRatio - 0.9999) < 0.0001); + } + + @Test + public void testCreateKey() { + CompoundBloomFilterBase cbfb = new CompoundBloomFilterBase(); + byte[] row = "myRow".getBytes(); + byte[] qualifier = "myQualifier".getBytes(); + byte[] rowKey = cbfb.createBloomKey(row, 0, row.length, + row, 0, 0); + byte[] rowColKey = cbfb.createBloomKey(row, 0, row.length, + qualifier, 0, qualifier.length); + KeyValue rowKV = KeyValue.createKeyValueFromKey(rowKey); + KeyValue rowColKV = KeyValue.createKeyValueFromKey(rowColKey); + assertEquals(rowKV.getTimestamp(), rowColKV.getTimestamp()); + assertEquals(Bytes.toStringBinary(rowKV.getRow()), + Bytes.toStringBinary(rowColKV.getRow())); + assertEquals(0, rowKV.getQualifier().length); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java b/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java new file mode 100644 index 00000000000..a55c2128126 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java @@ -0,0 +1,111 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import static org.junit.Assert.*; +import org.junit.Test; + +public class TestIdLock { + + private static final Log LOG = LogFactory.getLog(TestIdLock.class); + + private static final int NUM_IDS = 16; + private static final int NUM_THREADS = 128; + private static final int NUM_SECONDS = 20; + + private IdLock idLock = new IdLock(); + + private Map idOwner = new ConcurrentHashMap(); + + private class IdLockTestThread implements Callable { + + private String clientId; + + public IdLockTestThread(String clientId) { + this.clientId = clientId; + } + + @Override + public Boolean call() throws Exception { + Thread.currentThread().setName(clientId); + Random rand = new Random(); + long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000; + while (System.currentTimeMillis() < endTime) { + long id = rand.nextInt(NUM_IDS); + + LOG.info(clientId + " is waiting for id " + id); + IdLock.Entry lockEntry = idLock.getLockEntry(id); + try { + int sleepMs = 1 + rand.nextInt(4); + String owner = idOwner.get(id); + if (owner != null) { + LOG.error("Id " + id + " already taken by " + owner + ", " + + clientId + " failed"); + return false; + } + + idOwner.put(id, clientId); + LOG.info(clientId + " took id " + id + ", sleeping for " + + sleepMs + "ms"); + Thread.sleep(sleepMs); + LOG.info(clientId + " is releasing id " + id); + idOwner.remove(id); + + } finally { + idLock.releaseLockEntry(lockEntry); + } + } + return true; + } + + } + + @Test + public void testMultipleClients() throws Exception { + ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS); + try { + ExecutorCompletionService ecs = + new ExecutorCompletionService(exec); + for (int i = 0; i < NUM_THREADS; ++i) + ecs.submit(new IdLockTestThread("client_" + i)); + for (int i = 0; i < NUM_THREADS; ++i) { + Future result = ecs.take(); + assertTrue(result.get()); + } + idLock.assertMapEmpty(); + } finally { + exec.shutdown(); + } + } + +}