HBASE-3857 New test classes.
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1153647 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c30bce0eaa
commit
d2c1d20bfc
|
@ -0,0 +1,227 @@
|
|||
/*
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestFixedFileTrailer {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestFixedFileTrailer.class);
|
||||
|
||||
/** The number of used fields by version. Indexed by version minus one. */
|
||||
private static final int[] NUM_FIELDS_BY_VERSION = new int[] { 8, 13 };
|
||||
|
||||
private HBaseTestingUtility util = new HBaseTestingUtility();
|
||||
private FileSystem fs;
|
||||
private ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
private int version;
|
||||
|
||||
static {
|
||||
assert NUM_FIELDS_BY_VERSION.length == HFile.MAX_FORMAT_VERSION
|
||||
- HFile.MIN_FORMAT_VERSION + 1;
|
||||
}
|
||||
|
||||
public TestFixedFileTrailer(int version) {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> getParameters() {
|
||||
List<Object[]> versionsToTest = new ArrayList<Object[]>();
|
||||
for (int v = HFile.MIN_FORMAT_VERSION; v <= HFile.MAX_FORMAT_VERSION; ++v)
|
||||
versionsToTest.add(new Integer[] { v } );
|
||||
return versionsToTest;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
fs = FileSystem.get(util.getConfiguration());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTrailer() throws IOException {
|
||||
FixedFileTrailer t = new FixedFileTrailer(version);
|
||||
t.setDataIndexCount(3);
|
||||
t.setEntryCount(((long) Integer.MAX_VALUE) + 1);
|
||||
|
||||
if (version == 1) {
|
||||
t.setFileInfoOffset(876);
|
||||
}
|
||||
|
||||
if (version == 2) {
|
||||
t.setLastDataBlockOffset(291);
|
||||
t.setNumDataIndexLevels(3);
|
||||
t.setComparatorClass(KeyValue.KEY_COMPARATOR.getClass());
|
||||
t.setFirstDataBlockOffset(9081723123L); // Completely unrealistic.
|
||||
t.setUncompressedDataIndexSize(827398717L); // Something random.
|
||||
}
|
||||
|
||||
t.setLoadOnOpenOffset(128);
|
||||
t.setMetaIndexCount(7);
|
||||
|
||||
t.setTotalUncompressedBytes(129731987);
|
||||
|
||||
{
|
||||
DataOutputStream dos = new DataOutputStream(baos); // Limited scope.
|
||||
t.serialize(dos);
|
||||
dos.flush();
|
||||
assertEquals(dos.size(), FixedFileTrailer.getTrailerSize(version));
|
||||
}
|
||||
|
||||
byte[] bytes = baos.toByteArray();
|
||||
baos.reset();
|
||||
|
||||
assertEquals(bytes.length, FixedFileTrailer.getTrailerSize(version));
|
||||
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
|
||||
|
||||
// Finished writing, trying to read.
|
||||
{
|
||||
DataInputStream dis = new DataInputStream(bais);
|
||||
FixedFileTrailer t2 = new FixedFileTrailer(version);
|
||||
t2.deserialize(dis);
|
||||
assertEquals(-1, bais.read()); // Ensure we have read everything.
|
||||
checkLoadedTrailer(version, t, t2);
|
||||
}
|
||||
|
||||
// Now check what happens if the trailer is corrupted.
|
||||
Path trailerPath = new Path(HBaseTestingUtility.getTestDir(), "trailer_"
|
||||
+ version);
|
||||
|
||||
{
|
||||
for (byte invalidVersion : new byte[] { HFile.MIN_FORMAT_VERSION - 1,
|
||||
HFile.MAX_FORMAT_VERSION + 1}) {
|
||||
bytes[bytes.length - 1] = invalidVersion;
|
||||
writeTrailer(trailerPath, null, bytes);
|
||||
try {
|
||||
readTrailer(trailerPath);
|
||||
fail("Exception expected");
|
||||
} catch (IOException ex) {
|
||||
// Make it easy to debug this.
|
||||
String msg = ex.getMessage();
|
||||
String cleanMsg = msg.replaceAll(
|
||||
"^(java(\\.[a-zA-Z]+)+:\\s+)?|\\s+\\(.*\\)\\s*$", "");
|
||||
assertEquals("Actual exception message is \"" + msg + "\".\n" +
|
||||
"Cleaned-up message", // will be followed by " expected: ..."
|
||||
"Invalid HFile version: " + invalidVersion, cleanMsg);
|
||||
LOG.info("Got an expected exception: " + msg);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Now write the trailer into a file and auto-detect the version.
|
||||
writeTrailer(trailerPath, t, null);
|
||||
|
||||
FixedFileTrailer t4 = readTrailer(trailerPath);
|
||||
|
||||
checkLoadedTrailer(version, t, t4);
|
||||
|
||||
String trailerStr = t.toString();
|
||||
assertEquals("Invalid number of fields in the string representation "
|
||||
+ "of the trailer: " + trailerStr, NUM_FIELDS_BY_VERSION[version - 1],
|
||||
trailerStr.split(", ").length);
|
||||
assertEquals(trailerStr, t4.toString());
|
||||
}
|
||||
|
||||
private FixedFileTrailer readTrailer(Path trailerPath) throws IOException {
|
||||
FSDataInputStream fsdis = fs.open(trailerPath);
|
||||
FixedFileTrailer trailerRead = FixedFileTrailer.readFromStream(fsdis,
|
||||
fs.getFileStatus(trailerPath).getLen());
|
||||
fsdis.close();
|
||||
return trailerRead;
|
||||
}
|
||||
|
||||
private void writeTrailer(Path trailerPath, FixedFileTrailer t,
|
||||
byte[] useBytesInstead) throws IOException {
|
||||
assert (t == null) != (useBytesInstead == null); // Expect one non-null.
|
||||
|
||||
FSDataOutputStream fsdos = fs.create(trailerPath);
|
||||
fsdos.write(135); // to make deserializer's job less trivial
|
||||
if (useBytesInstead != null) {
|
||||
fsdos.write(useBytesInstead);
|
||||
} else {
|
||||
t.serialize(fsdos);
|
||||
}
|
||||
fsdos.close();
|
||||
}
|
||||
|
||||
private void checkLoadedTrailer(int version, FixedFileTrailer expected,
|
||||
FixedFileTrailer loaded) throws IOException {
|
||||
assertEquals(version, loaded.getVersion());
|
||||
assertEquals(expected.getDataIndexCount(), loaded.getDataIndexCount());
|
||||
|
||||
assertEquals(Math.min(expected.getEntryCount(),
|
||||
version == 1 ? Integer.MAX_VALUE : Long.MAX_VALUE),
|
||||
loaded.getEntryCount());
|
||||
|
||||
if (version == 1) {
|
||||
assertEquals(expected.getFileInfoOffset(), loaded.getFileInfoOffset());
|
||||
}
|
||||
|
||||
if (version == 2) {
|
||||
assertEquals(expected.getLastDataBlockOffset(),
|
||||
loaded.getLastDataBlockOffset());
|
||||
assertEquals(expected.getNumDataIndexLevels(),
|
||||
loaded.getNumDataIndexLevels());
|
||||
assertEquals(expected.createComparator().getClass().getName(),
|
||||
loaded.createComparator().getClass().getName());
|
||||
assertEquals(expected.getFirstDataBlockOffset(),
|
||||
loaded.getFirstDataBlockOffset());
|
||||
assertTrue(
|
||||
expected.createComparator() instanceof KeyValue.KeyComparator);
|
||||
assertEquals(expected.getUncompressedDataIndexSize(),
|
||||
loaded.getUncompressedDataIndexSize());
|
||||
}
|
||||
|
||||
assertEquals(expected.getLoadOnOpenDataOffset(),
|
||||
loaded.getLoadOnOpenDataOffset());
|
||||
assertEquals(expected.getMetaIndexCount(), loaded.getMetaIndexCount());
|
||||
|
||||
assertEquals(expected.getTotalUncompressedBytes(),
|
||||
loaded.getTotalUncompressedBytes());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,499 @@
|
|||
/*
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
|
||||
import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestHFileBlock {
|
||||
|
||||
private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
|
||||
|
||||
static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
|
||||
NONE, GZ };
|
||||
|
||||
// In case we need to temporarily switch some test cases to just test gzip.
|
||||
static final Compression.Algorithm[] GZIP_ONLY = { GZ };
|
||||
|
||||
private static final int NUM_TEST_BLOCKS = 1000;
|
||||
|
||||
private static final int NUM_READER_THREADS = 26;
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL =
|
||||
new HBaseTestingUtility();
|
||||
private FileSystem fs;
|
||||
private int uncompressedSizeV1;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
fs = FileSystem.get(TEST_UTIL.getConfiguration());
|
||||
TEST_UTIL.initTestDir();
|
||||
}
|
||||
|
||||
public void writeTestBlockContents(DataOutputStream dos) throws IOException {
|
||||
// This compresses really well.
|
||||
for (int i = 0; i < 1000; ++i)
|
||||
dos.writeInt(i / 100);
|
||||
}
|
||||
|
||||
public byte[] createTestV1Block(Compression.Algorithm algo)
|
||||
throws IOException {
|
||||
Compressor compressor = algo.getCompressor();
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
OutputStream os = algo.createCompressionStream(baos, compressor, 0);
|
||||
DataOutputStream dos = new DataOutputStream(os);
|
||||
BlockType.META.write(dos); // Let's make this a meta block.
|
||||
writeTestBlockContents(dos);
|
||||
uncompressedSizeV1 = dos.size();
|
||||
dos.flush();
|
||||
algo.returnCompressor(compressor);
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
private byte[] createTestV2Block(Compression.Algorithm algo)
|
||||
throws IOException {
|
||||
final BlockType blockType = BlockType.DATA;
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
|
||||
DataOutputStream dos = hbw.startWriting(blockType, false);
|
||||
writeTestBlockContents(dos);
|
||||
byte[] headerAndData = hbw.getHeaderAndData();
|
||||
assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
|
||||
hbw.releaseCompressor();
|
||||
return headerAndData;
|
||||
}
|
||||
|
||||
public String createTestBlockStr(Compression.Algorithm algo)
|
||||
throws IOException {
|
||||
byte[] testV2Block = createTestV2Block(algo);
|
||||
int osOffset = HFileBlock.HEADER_SIZE + 9;
|
||||
if (osOffset < testV2Block.length) {
|
||||
// Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
|
||||
// variations across operating systems.
|
||||
// See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
|
||||
testV2Block[osOffset] = 3;
|
||||
}
|
||||
return Bytes.toStringBinary(testV2Block);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoCompression() throws IOException {
|
||||
assertEquals(4000 + HFileBlock.HEADER_SIZE, createTestV2Block(NONE).length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGzipCompression() throws IOException {
|
||||
assertEquals(
|
||||
"DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
|
||||
+ "\\xFF\\xFF\\xFF\\xFF"
|
||||
// gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
|
||||
+ "\\x1F\\x8B" // gzip magic signature
|
||||
+ "\\x08" // Compression method: 8 = "deflate"
|
||||
+ "\\x00" // Flags
|
||||
+ "\\x00\\x00\\x00\\x00" // mtime
|
||||
+ "\\x00" // XFL (extra flags)
|
||||
// OS (0 = FAT filesystems, 3 = Unix). However, this field
|
||||
// sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
|
||||
+ "\\x03"
|
||||
+ "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
|
||||
+ "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
|
||||
+ "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00",
|
||||
createTestBlockStr(GZ));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReaderV1() throws IOException {
|
||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
||||
for (boolean pread : new boolean[] { false, true }) {
|
||||
byte[] block = createTestV1Block(algo);
|
||||
Path path = new Path(HBaseTestingUtility.getTestDir(), "blocks_v1_"
|
||||
+ algo);
|
||||
LOG.info("Creating temporary file at " + path);
|
||||
FSDataOutputStream os = fs.create(path);
|
||||
int totalSize = 0;
|
||||
int numBlocks = 50;
|
||||
for (int i = 0; i < numBlocks; ++i) {
|
||||
os.write(block);
|
||||
totalSize += block.length;
|
||||
}
|
||||
os.close();
|
||||
|
||||
FSDataInputStream is = fs.open(path);
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV1(is, algo,
|
||||
totalSize);
|
||||
HFileBlock b;
|
||||
int numBlocksRead = 0;
|
||||
long pos = 0;
|
||||
while (pos < totalSize) {
|
||||
b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread);
|
||||
b.sanityCheck();
|
||||
pos += block.length;
|
||||
numBlocksRead++;
|
||||
}
|
||||
assertEquals(numBlocks, numBlocksRead);
|
||||
is.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReaderV2() throws IOException {
|
||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
||||
for (boolean pread : new boolean[] { false, true }) {
|
||||
Path path = new Path(HBaseTestingUtility.getTestDir(), "blocks_v2_"
|
||||
+ algo);
|
||||
FSDataOutputStream os = fs.create(path);
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(algo);
|
||||
long totalSize = 0;
|
||||
for (int blockId = 0; blockId < 2; ++blockId) {
|
||||
DataOutputStream dos = hbw.startWriting(BlockType.DATA, false);
|
||||
for (int i = 0; i < 1234; ++i)
|
||||
dos.writeInt(i);
|
||||
hbw.writeHeaderAndData(os);
|
||||
totalSize += hbw.getOnDiskSizeWithHeader();
|
||||
}
|
||||
os.close();
|
||||
|
||||
FSDataInputStream is = fs.open(path);
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo,
|
||||
totalSize);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
|
||||
is.close();
|
||||
|
||||
b.sanityCheck();
|
||||
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
|
||||
assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader());
|
||||
String blockStr = b.toString();
|
||||
|
||||
if (algo == GZ) {
|
||||
is = fs.open(path);
|
||||
hbr = new HFileBlock.FSReaderV2(is, algo, totalSize);
|
||||
b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE, -1, pread);
|
||||
assertEquals(blockStr, b.toString());
|
||||
int wrongCompressedSize = 2172;
|
||||
try {
|
||||
b = hbr.readBlockData(0, wrongCompressedSize
|
||||
+ HFileBlock.HEADER_SIZE, -1, pread);
|
||||
fail("Exception expected");
|
||||
} catch (IOException ex) {
|
||||
String expectedPrefix = "On-disk size without header provided is "
|
||||
+ wrongCompressedSize + ", but block header contains "
|
||||
+ b.getOnDiskSizeWithoutHeader() + ".";
|
||||
assertTrue("Invalid exception message: '" + ex.getMessage()
|
||||
+ "'.\nMessage is expected to start with: '" + expectedPrefix
|
||||
+ "'", ex.getMessage().startsWith(expectedPrefix));
|
||||
}
|
||||
is.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreviousOffset() throws IOException {
|
||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
||||
for (boolean pread : BOOLEAN_VALUES) {
|
||||
for (boolean cacheOnWrite : BOOLEAN_VALUES) {
|
||||
Random rand = defaultRandom();
|
||||
LOG.info("Compression algorithm: " + algo + ", pread=" + pread);
|
||||
Path path = new Path(HBaseTestingUtility.getTestDir(), "prev_offset");
|
||||
List<Long> expectedOffsets = new ArrayList<Long>();
|
||||
List<Long> expectedPrevOffsets = new ArrayList<Long>();
|
||||
List<BlockType> expectedTypes = new ArrayList<BlockType>();
|
||||
List<ByteBuffer> expectedContents = cacheOnWrite
|
||||
? new ArrayList<ByteBuffer>() : null;
|
||||
long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
|
||||
expectedPrevOffsets, expectedTypes, expectedContents, true);
|
||||
|
||||
FSDataInputStream is = fs.open(path);
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, algo,
|
||||
totalSize);
|
||||
long curOffset = 0;
|
||||
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
|
||||
if (!pread) {
|
||||
assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
|
||||
HFileBlock.HEADER_SIZE));
|
||||
}
|
||||
|
||||
assertEquals(expectedOffsets.get(i).longValue(), curOffset);
|
||||
|
||||
LOG.info("Reading block #" + i + " at offset " + curOffset);
|
||||
HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread);
|
||||
LOG.info("Block #" + i + ": " + b);
|
||||
assertEquals("Invalid block #" + i + "'s type:",
|
||||
expectedTypes.get(i), b.getBlockType());
|
||||
assertEquals("Invalid previous block offset for block " + i
|
||||
+ " of " + "type " + b.getBlockType() + ":",
|
||||
(long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
|
||||
b.sanityCheck();
|
||||
assertEquals(curOffset, b.getOffset());
|
||||
|
||||
// Now re-load this block knowing the on-disk size. This tests a
|
||||
// different branch in the loader.
|
||||
HFileBlock b2 = hbr.readBlockData(curOffset,
|
||||
b.getOnDiskSizeWithHeader(), -1, pread);
|
||||
b2.sanityCheck();
|
||||
|
||||
assertEquals(b.getBlockType(), b2.getBlockType());
|
||||
assertEquals(b.getOnDiskSizeWithoutHeader(),
|
||||
b2.getOnDiskSizeWithoutHeader());
|
||||
assertEquals(b.getOnDiskSizeWithHeader(),
|
||||
b2.getOnDiskSizeWithHeader());
|
||||
assertEquals(b.getUncompressedSizeWithoutHeader(),
|
||||
b2.getUncompressedSizeWithoutHeader());
|
||||
assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
|
||||
assertEquals(curOffset, b2.getOffset());
|
||||
|
||||
curOffset += b.getOnDiskSizeWithHeader();
|
||||
|
||||
if (cacheOnWrite) {
|
||||
// In the cache-on-write mode we store uncompressed bytes so we
|
||||
// can compare them to what was read by the block reader.
|
||||
|
||||
ByteBuffer bufRead = b.getBufferWithHeader();
|
||||
ByteBuffer bufExpected = expectedContents.get(i);
|
||||
boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
|
||||
bufRead.arrayOffset(), bufRead.limit(),
|
||||
bufExpected.array(), bufExpected.arrayOffset(),
|
||||
bufExpected.limit()) == 0;
|
||||
String wrongBytesMsg = "";
|
||||
|
||||
if (!bytesAreCorrect) {
|
||||
// Optimization: only construct an error message in case we
|
||||
// will need it.
|
||||
wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
|
||||
+ algo + ", pread=" + pread + "):\n";
|
||||
wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
|
||||
bufExpected.arrayOffset(), Math.min(32,
|
||||
bufExpected.limit()))
|
||||
+ ", actual:\n"
|
||||
+ Bytes.toStringBinary(bufRead.array(),
|
||||
bufRead.arrayOffset(), Math.min(32, bufRead.limit()));
|
||||
}
|
||||
|
||||
assertTrue(wrongBytesMsg, bytesAreCorrect);
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(curOffset, fs.getFileStatus(path).getLen());
|
||||
is.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Random defaultRandom() {
|
||||
return new Random(189237);
|
||||
}
|
||||
|
||||
private class BlockReaderThread implements Callable<Boolean> {
|
||||
private final String clientId;
|
||||
private final HFileBlock.FSReader hbr;
|
||||
private final List<Long> offsets;
|
||||
private final List<BlockType> types;
|
||||
private final long fileSize;
|
||||
|
||||
public BlockReaderThread(String clientId,
|
||||
HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
|
||||
long fileSize) {
|
||||
this.clientId = clientId;
|
||||
this.offsets = offsets;
|
||||
this.hbr = hbr;
|
||||
this.types = types;
|
||||
this.fileSize = fileSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
Random rand = new Random(clientId.hashCode());
|
||||
long endTime = System.currentTimeMillis() + 10000;
|
||||
int numBlocksRead = 0;
|
||||
int numPositionalRead = 0;
|
||||
int numWithOnDiskSize = 0;
|
||||
while (System.currentTimeMillis() < endTime) {
|
||||
int blockId = rand.nextInt(NUM_TEST_BLOCKS);
|
||||
long offset = offsets.get(blockId);
|
||||
boolean pread = rand.nextBoolean();
|
||||
boolean withOnDiskSize = rand.nextBoolean();
|
||||
long expectedSize =
|
||||
(blockId == NUM_TEST_BLOCKS - 1 ? fileSize
|
||||
: offsets.get(blockId + 1)) - offset;
|
||||
|
||||
HFileBlock b;
|
||||
try {
|
||||
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
|
||||
b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Error in client " + clientId + " trying to read block at "
|
||||
+ offset + ", pread=" + pread + ", withOnDiskSize=" +
|
||||
withOnDiskSize, ex);
|
||||
return false;
|
||||
}
|
||||
|
||||
assertEquals(types.get(blockId), b.getBlockType());
|
||||
assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
|
||||
assertEquals(offset, b.getOffset());
|
||||
|
||||
++numBlocksRead;
|
||||
if (pread)
|
||||
++numPositionalRead;
|
||||
if (withOnDiskSize)
|
||||
++numWithOnDiskSize;
|
||||
}
|
||||
LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
|
||||
" blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
|
||||
"specified: " + numWithOnDiskSize + ")");
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConcurrentReading() throws Exception {
|
||||
for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
|
||||
Path path =
|
||||
new Path(HBaseTestingUtility.getTestDir(), "concurrent_reading");
|
||||
Random rand = defaultRandom();
|
||||
List<Long> offsets = new ArrayList<Long>();
|
||||
List<BlockType> types = new ArrayList<BlockType>();
|
||||
writeBlocks(rand, compressAlgo, path, offsets, null, types, null, false);
|
||||
FSDataInputStream is = fs.open(path);
|
||||
long fileSize = fs.getFileStatus(path).getLen();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, compressAlgo,
|
||||
fileSize);
|
||||
|
||||
Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
|
||||
ExecutorCompletionService<Boolean> ecs =
|
||||
new ExecutorCompletionService<Boolean>(exec);
|
||||
|
||||
for (int i = 0; i < NUM_READER_THREADS; ++i) {
|
||||
ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
|
||||
offsets, types, fileSize));
|
||||
}
|
||||
|
||||
for (int i = 0; i < NUM_READER_THREADS; ++i) {
|
||||
Future<Boolean> result = ecs.take();
|
||||
assertTrue(result.get());
|
||||
LOG.info(String.valueOf(i + 1)
|
||||
+ " reader threads finished successfully (algo=" + compressAlgo
|
||||
+ ")");
|
||||
}
|
||||
|
||||
is.close();
|
||||
}
|
||||
}
|
||||
|
||||
private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
|
||||
Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
|
||||
List<BlockType> expectedTypes, List<ByteBuffer> expectedContents,
|
||||
boolean detailedLogging) throws IOException {
|
||||
boolean cacheOnWrite = expectedContents != null;
|
||||
FSDataOutputStream os = fs.create(path);
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo);
|
||||
Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
|
||||
long totalSize = 0;
|
||||
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
|
||||
int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
|
||||
BlockType bt = BlockType.values()[blockTypeOrdinal];
|
||||
DataOutputStream dos = hbw.startWriting(bt, cacheOnWrite);
|
||||
for (int j = 0; j < rand.nextInt(500); ++j) {
|
||||
// This might compress well.
|
||||
dos.writeShort(i + 1);
|
||||
dos.writeInt(j + 1);
|
||||
}
|
||||
|
||||
if (expectedOffsets != null)
|
||||
expectedOffsets.add(os.getPos());
|
||||
|
||||
if (expectedPrevOffsets != null) {
|
||||
Long prevOffset = prevOffsetByType.get(bt);
|
||||
expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
|
||||
prevOffsetByType.put(bt, os.getPos());
|
||||
}
|
||||
|
||||
expectedTypes.add(bt);
|
||||
|
||||
hbw.writeHeaderAndData(os);
|
||||
totalSize += hbw.getOnDiskSizeWithHeader();
|
||||
|
||||
if (cacheOnWrite)
|
||||
expectedContents.add(hbw.getUncompressedBufferWithHeader());
|
||||
|
||||
if (detailedLogging) {
|
||||
LOG.info("Writing block #" + i + " of type " + bt
|
||||
+ ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
|
||||
+ " at offset " + os.getPos());
|
||||
}
|
||||
}
|
||||
os.close();
|
||||
LOG.info("Created a temporary file at " + path + ", "
|
||||
+ fs.getFileStatus(path).getLen() + " byte, compression=" +
|
||||
compressAlgo);
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockHeapSize() {
|
||||
for (int size : new int[] { 100, 256, 12345 }) {
|
||||
byte[] byteArr = new byte[HFileBlock.HEADER_SIZE + size];
|
||||
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
true, -1);
|
||||
assertEquals(80, HFileBlock.BYTE_BUFFER_HEAP_SIZE);
|
||||
long expected = ClassSize.align(ClassSize.estimateBase(HFileBlock.class,
|
||||
true)
|
||||
+ ClassSize.estimateBase(buf.getClass(), true)
|
||||
+ HFileBlock.HEADER_SIZE + size);
|
||||
assertEquals(expected, block.heapSize());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,602 @@
|
|||
/*
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexChunk;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestHFileBlockIndex {
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> compressionAlgorithms() {
|
||||
return HBaseTestingUtility.COMPRESSION_ALGORITHMS_PARAMETERIZED;
|
||||
}
|
||||
|
||||
public TestHFileBlockIndex(Compression.Algorithm compr) {
|
||||
this.compr = compr;
|
||||
}
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestHFileBlockIndex.class);
|
||||
|
||||
private static final int NUM_DATA_BLOCKS = 1000;
|
||||
private static final HBaseTestingUtility TEST_UTIL =
|
||||
new HBaseTestingUtility();
|
||||
|
||||
private static final int SMALL_BLOCK_SIZE = 4096;
|
||||
private static final int NUM_KV = 10000;
|
||||
|
||||
private static FileSystem fs;
|
||||
private Path path;
|
||||
private Random rand;
|
||||
private long rootIndexOffset;
|
||||
private int numRootEntries;
|
||||
private int numLevels;
|
||||
private static final List<byte[]> keys = new ArrayList<byte[]>();
|
||||
private final Compression.Algorithm compr;
|
||||
private byte[] firstKeyInFile;
|
||||
private Configuration conf;
|
||||
|
||||
private static final int[] INDEX_CHUNK_SIZES = { 4096, 512, 384 };
|
||||
private static final int[] EXPECTED_NUM_LEVELS = { 2, 3, 4 };
|
||||
private static final int[] UNCOMPRESSED_INDEX_SIZES =
|
||||
{ 19187, 21813, 23086 };
|
||||
|
||||
static {
|
||||
assert INDEX_CHUNK_SIZES.length == EXPECTED_NUM_LEVELS.length;
|
||||
assert INDEX_CHUNK_SIZES.length == UNCOMPRESSED_INDEX_SIZES.length;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
keys.clear();
|
||||
rand = new Random(2389757);
|
||||
firstKeyInFile = null;
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
|
||||
// This test requires at least HFile format version 2.
|
||||
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
|
||||
|
||||
fs = FileSystem.get(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockIndex() throws IOException {
|
||||
path = new Path(HBaseTestingUtility.getTestDir(), "block_index_" + compr);
|
||||
writeWholeIndex();
|
||||
readIndex();
|
||||
}
|
||||
|
||||
/**
|
||||
* A wrapper around a block reader which only caches the results of the last
|
||||
* operation. Not thread-safe.
|
||||
*/
|
||||
private static class BlockReaderWrapper implements HFileBlock.BasicReader {
|
||||
|
||||
private HFileBlock.BasicReader realReader;
|
||||
private long prevOffset;
|
||||
private long prevOnDiskSize;
|
||||
private long prevUncompressedSize;
|
||||
private boolean prevPread;
|
||||
private HFileBlock prevBlock;
|
||||
|
||||
public int hitCount = 0;
|
||||
public int missCount = 0;
|
||||
|
||||
public BlockReaderWrapper(HFileBlock.BasicReader realReader) {
|
||||
this.realReader = realReader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlock readBlockData(long offset, long onDiskSize,
|
||||
int uncompressedSize, boolean pread) throws IOException {
|
||||
if (offset == prevOffset && onDiskSize == prevOnDiskSize &&
|
||||
uncompressedSize == prevUncompressedSize && pread == prevPread) {
|
||||
hitCount += 1;
|
||||
return prevBlock;
|
||||
}
|
||||
|
||||
missCount += 1;
|
||||
prevBlock = realReader.readBlockData(offset, onDiskSize,
|
||||
uncompressedSize, pread);
|
||||
prevOffset = offset;
|
||||
prevOnDiskSize = onDiskSize;
|
||||
prevUncompressedSize = uncompressedSize;
|
||||
prevPread = pread;
|
||||
|
||||
return prevBlock;
|
||||
}
|
||||
}
|
||||
|
||||
public void readIndex() throws IOException {
|
||||
long fileSize = fs.getFileStatus(path).getLen();
|
||||
LOG.info("Size of " + path + ": " + fileSize);
|
||||
|
||||
FSDataInputStream istream = fs.open(path);
|
||||
HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(istream,
|
||||
compr, fs.getFileStatus(path).getLen());
|
||||
|
||||
BlockReaderWrapper brw = new BlockReaderWrapper(blockReader);
|
||||
HFileBlockIndex.BlockIndexReader indexReader =
|
||||
new HFileBlockIndex.BlockIndexReader(
|
||||
Bytes.BYTES_RAWCOMPARATOR, numLevels, brw);
|
||||
|
||||
indexReader.readRootIndex(blockReader.blockRange(rootIndexOffset,
|
||||
fileSize).nextBlockAsStream(BlockType.ROOT_INDEX), numRootEntries);
|
||||
|
||||
long prevOffset = -1;
|
||||
int i = 0;
|
||||
int expectedHitCount = 0;
|
||||
int expectedMissCount = 0;
|
||||
LOG.info("Total number of keys: " + keys.size());
|
||||
for (byte[] key : keys) {
|
||||
assertTrue(key != null);
|
||||
assertTrue(indexReader != null);
|
||||
HFileBlock b = indexReader.seekToDataBlock(key, 0, key.length, null);
|
||||
if (Bytes.BYTES_RAWCOMPARATOR.compare(key, firstKeyInFile) < 0) {
|
||||
assertTrue(b == null);
|
||||
++i;
|
||||
continue;
|
||||
}
|
||||
|
||||
String keyStr = "key #" + i + ", " + Bytes.toStringBinary(key);
|
||||
|
||||
assertTrue("seekToDataBlock failed for " + keyStr, b != null);
|
||||
|
||||
if (prevOffset == b.getOffset()) {
|
||||
assertEquals(++expectedHitCount, brw.hitCount);
|
||||
} else {
|
||||
LOG.info("First key in a new block: " + keyStr + ", block offset: "
|
||||
+ b.getOffset() + ")");
|
||||
assertTrue(b.getOffset() > prevOffset);
|
||||
assertEquals(++expectedMissCount, brw.missCount);
|
||||
prevOffset = b.getOffset();
|
||||
}
|
||||
++i;
|
||||
}
|
||||
|
||||
istream.close();
|
||||
}
|
||||
|
||||
private void writeWholeIndex() throws IOException {
|
||||
assertEquals(0, keys.size());
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(compr);
|
||||
FSDataOutputStream outputStream = fs.create(path);
|
||||
HFileBlockIndex.BlockIndexWriter biw =
|
||||
new HFileBlockIndex.BlockIndexWriter(hbw, null, null);
|
||||
|
||||
for (int i = 0; i < NUM_DATA_BLOCKS; ++i) {
|
||||
hbw.startWriting(BlockType.DATA, false).write(
|
||||
String.valueOf(rand.nextInt(1000)).getBytes());
|
||||
long blockOffset = outputStream.getPos();
|
||||
hbw.writeHeaderAndData(outputStream);
|
||||
|
||||
byte[] firstKey = null;
|
||||
for (int j = 0; j < 16; ++j) {
|
||||
byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i * 16 + j);
|
||||
keys.add(k);
|
||||
if (j == 8)
|
||||
firstKey = k;
|
||||
}
|
||||
assertTrue(firstKey != null);
|
||||
if (firstKeyInFile == null)
|
||||
firstKeyInFile = firstKey;
|
||||
biw.addEntry(firstKey, blockOffset, hbw.getOnDiskSizeWithHeader());
|
||||
|
||||
writeInlineBlocks(hbw, outputStream, biw, false);
|
||||
}
|
||||
writeInlineBlocks(hbw, outputStream, biw, true);
|
||||
rootIndexOffset = biw.writeIndexBlocks(outputStream);
|
||||
outputStream.close();
|
||||
|
||||
numLevels = biw.getNumLevels();
|
||||
numRootEntries = biw.getNumRootEntries();
|
||||
|
||||
LOG.info("Index written: numLevels=" + numLevels + ", numRootEntries=" +
|
||||
numRootEntries + ", rootIndexOffset=" + rootIndexOffset);
|
||||
}
|
||||
|
||||
private void writeInlineBlocks(HFileBlock.Writer hbw,
|
||||
FSDataOutputStream outputStream, HFileBlockIndex.BlockIndexWriter biw,
|
||||
boolean isClosing) throws IOException {
|
||||
while (biw.shouldWriteBlock(isClosing)) {
|
||||
long offset = outputStream.getPos();
|
||||
biw.writeInlineBlock(hbw.startWriting(biw.getInlineBlockType(), false));
|
||||
hbw.writeHeaderAndData(outputStream);
|
||||
biw.blockWritten(offset, hbw.getOnDiskSizeWithHeader(),
|
||||
hbw.getUncompressedSizeWithoutHeader());
|
||||
LOG.info("Wrote an inline index block at " + offset + ", size " +
|
||||
hbw.getOnDiskSizeWithHeader());
|
||||
}
|
||||
}
|
||||
|
||||
private static final long getDummyFileOffset(int i) {
|
||||
return i * 185 + 379;
|
||||
}
|
||||
|
||||
private static final int getDummyOnDiskSize(int i) {
|
||||
return i * i * 37 + i * 19 + 13;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecondaryIndexBinarySearch() throws IOException {
|
||||
int numTotalKeys = 99;
|
||||
assertTrue(numTotalKeys % 2 == 1); // Ensure no one made this even.
|
||||
|
||||
// We only add odd-index keys into the array that we will binary-search.
|
||||
int numSearchedKeys = (numTotalKeys - 1) / 2;
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
DataOutputStream dos = new DataOutputStream(baos);
|
||||
|
||||
dos.writeInt(numSearchedKeys);
|
||||
int curAllEntriesSize = 0;
|
||||
int numEntriesAdded = 0;
|
||||
|
||||
// Only odd-index elements of this array are used to keep the secondary
|
||||
// index entries of the corresponding keys.
|
||||
int secondaryIndexEntries[] = new int[numTotalKeys];
|
||||
|
||||
for (int i = 0; i < numTotalKeys; ++i) {
|
||||
byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i * 2);
|
||||
keys.add(k);
|
||||
String msgPrefix = "Key #" + i + " (" + Bytes.toStringBinary(k) + "): ";
|
||||
StringBuilder padding = new StringBuilder();
|
||||
while (msgPrefix.length() + padding.length() < 70)
|
||||
padding.append(' ');
|
||||
msgPrefix += padding;
|
||||
if (i % 2 == 1) {
|
||||
dos.writeInt(curAllEntriesSize);
|
||||
secondaryIndexEntries[i] = curAllEntriesSize;
|
||||
LOG.info(msgPrefix + "secondary index entry #" + ((i - 1) / 2) +
|
||||
", offset " + curAllEntriesSize);
|
||||
curAllEntriesSize += k.length
|
||||
+ HFileBlockIndex.SECONDARY_INDEX_ENTRY_OVERHEAD;
|
||||
++numEntriesAdded;
|
||||
} else {
|
||||
secondaryIndexEntries[i] = -1;
|
||||
LOG.info(msgPrefix + "not in the searched array");
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure the keys are increasing.
|
||||
for (int i = 0; i < keys.size() - 1; ++i)
|
||||
assertTrue(Bytes.BYTES_RAWCOMPARATOR.compare(keys.get(i),
|
||||
keys.get(i + 1)) < 0);
|
||||
|
||||
dos.writeInt(curAllEntriesSize);
|
||||
assertEquals(numSearchedKeys, numEntriesAdded);
|
||||
int secondaryIndexOffset = dos.size();
|
||||
assertEquals(Bytes.SIZEOF_INT * (numSearchedKeys + 2),
|
||||
secondaryIndexOffset);
|
||||
|
||||
for (int i = 1; i <= numTotalKeys - 1; i += 2) {
|
||||
assertEquals(dos.size(),
|
||||
secondaryIndexOffset + secondaryIndexEntries[i]);
|
||||
long dummyFileOffset = getDummyFileOffset(i);
|
||||
int dummyOnDiskSize = getDummyOnDiskSize(i);
|
||||
LOG.debug("Storing file offset=" + dummyFileOffset + " and onDiskSize=" +
|
||||
dummyOnDiskSize + " at offset " + dos.size());
|
||||
dos.writeLong(dummyFileOffset);
|
||||
dos.writeInt(dummyOnDiskSize);
|
||||
LOG.debug("Stored key " + ((i - 1) / 2) +" at offset " + dos.size());
|
||||
dos.write(keys.get(i));
|
||||
}
|
||||
|
||||
dos.writeInt(curAllEntriesSize);
|
||||
|
||||
ByteBuffer nonRootIndex = ByteBuffer.wrap(baos.toByteArray());
|
||||
for (int i = 0; i < numTotalKeys; ++i) {
|
||||
byte[] searchKey = keys.get(i);
|
||||
byte[] arrayHoldingKey = new byte[searchKey.length +
|
||||
searchKey.length / 2];
|
||||
|
||||
// To make things a bit more interesting, store the key we are looking
|
||||
// for at a non-zero offset in a new array.
|
||||
System.arraycopy(searchKey, 0, arrayHoldingKey, searchKey.length / 2,
|
||||
searchKey.length);
|
||||
|
||||
int searchResult = BlockIndexReader.binarySearchNonRootIndex(
|
||||
arrayHoldingKey, searchKey.length / 2, searchKey.length, nonRootIndex,
|
||||
Bytes.BYTES_RAWCOMPARATOR);
|
||||
String lookupFailureMsg = "Failed to look up key #" + i + " ("
|
||||
+ Bytes.toStringBinary(searchKey) + ")";
|
||||
|
||||
int expectedResult;
|
||||
int referenceItem;
|
||||
|
||||
if (i % 2 == 1) {
|
||||
// This key is in the array we search as the element (i - 1) / 2. Make
|
||||
// sure we find it.
|
||||
expectedResult = (i - 1) / 2;
|
||||
referenceItem = i;
|
||||
} else {
|
||||
// This key is not in the array but between two elements on the array,
|
||||
// in the beginning, or in the end. The result should be the previous
|
||||
// key in the searched array, or -1 for i = 0.
|
||||
expectedResult = i / 2 - 1;
|
||||
referenceItem = i - 1;
|
||||
}
|
||||
|
||||
assertEquals(lookupFailureMsg, expectedResult, searchResult);
|
||||
|
||||
// Now test we can get the offset and the on-disk-size using a
|
||||
// higher-level API function.s
|
||||
boolean locateBlockResult =
|
||||
BlockIndexReader.locateNonRootIndexEntry(nonRootIndex, arrayHoldingKey,
|
||||
searchKey.length / 2, searchKey.length, Bytes.BYTES_RAWCOMPARATOR);
|
||||
|
||||
if (i == 0) {
|
||||
assertFalse(locateBlockResult);
|
||||
} else {
|
||||
assertTrue(locateBlockResult);
|
||||
String errorMsg = "i=" + i + ", position=" + nonRootIndex.position();
|
||||
assertEquals(errorMsg, getDummyFileOffset(referenceItem),
|
||||
nonRootIndex.getLong());
|
||||
assertEquals(errorMsg, getDummyOnDiskSize(referenceItem),
|
||||
nonRootIndex.getInt());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockIndexChunk() throws IOException {
|
||||
BlockIndexChunk c = new BlockIndexChunk();
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
int N = 1000;
|
||||
int[] numSubEntriesAt = new int[N];
|
||||
int numSubEntries = 0;
|
||||
for (int i = 0; i < N; ++i) {
|
||||
baos.reset();
|
||||
DataOutputStream dos = new DataOutputStream(baos);
|
||||
c.writeNonRoot(dos);
|
||||
assertEquals(c.getNonRootSize(), dos.size());
|
||||
|
||||
baos.reset();
|
||||
dos = new DataOutputStream(baos);
|
||||
c.writeRoot(dos);
|
||||
assertEquals(c.getRootSize(), dos.size());
|
||||
|
||||
byte[] k = TestHFileWriterV2.randomOrderedKey(rand, i);
|
||||
numSubEntries += rand.nextInt(5) + 1;
|
||||
keys.add(k);
|
||||
c.add(k, getDummyFileOffset(i), getDummyOnDiskSize(i), numSubEntries);
|
||||
}
|
||||
|
||||
// Test the ability to look up the entry that contains a particular
|
||||
// deeper-level index block's entry ("sub-entry"), assuming a global
|
||||
// 0-based ordering of sub-entries. This is needed for mid-key calculation.
|
||||
for (int i = 0; i < N; ++i) {
|
||||
for (int j = i == 0 ? 0 : numSubEntriesAt[i - 1];
|
||||
j < numSubEntriesAt[i];
|
||||
++j) {
|
||||
assertEquals(i, c.getEntryBySubEntry(j));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Checks if the HeapSize calculator is within reason */
|
||||
@Test
|
||||
public void testHeapSizeForBlockIndex() throws IOException {
|
||||
Class<HFileBlockIndex.BlockIndexReader> cl =
|
||||
HFileBlockIndex.BlockIndexReader.class;
|
||||
long expected = ClassSize.estimateBase(cl, false);
|
||||
|
||||
HFileBlockIndex.BlockIndexReader bi =
|
||||
new HFileBlockIndex.BlockIndexReader(Bytes.BYTES_RAWCOMPARATOR, 1);
|
||||
long actual = bi.heapSize();
|
||||
|
||||
// Since the arrays in BlockIndex(byte [][] blockKeys, long [] blockOffsets,
|
||||
// int [] blockDataSizes) are all null they are not going to show up in the
|
||||
// HeapSize calculation, so need to remove those array costs from expected.
|
||||
expected -= ClassSize.align(3 * ClassSize.ARRAY);
|
||||
|
||||
if (expected != actual) {
|
||||
ClassSize.estimateBase(cl, true);
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Testing block index through the HFile writer/reader APIs. Allows to test
|
||||
* setting index block size through configuration, intermediate-level index
|
||||
* blocks, and caching index blocks on write.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testHFileWriterAndReader() throws IOException {
|
||||
Path hfilePath = new Path(HBaseTestingUtility.getTestDir(),
|
||||
"hfile_for_block_index");
|
||||
BlockCache blockCache = StoreFile.getBlockCache(conf);
|
||||
|
||||
for (int testI = 0; testI < INDEX_CHUNK_SIZES.length; ++testI) {
|
||||
int indexBlockSize = INDEX_CHUNK_SIZES[testI];
|
||||
int expectedNumLevels = EXPECTED_NUM_LEVELS[testI];
|
||||
LOG.info("Index block size: " + indexBlockSize + ", compression: "
|
||||
+ compr);
|
||||
// Evict all blocks that were cached-on-write by the previous invocation.
|
||||
blockCache.evictBlocksByPrefix(hfilePath.getName()
|
||||
+ HFile.CACHE_KEY_SEPARATOR);
|
||||
|
||||
conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, indexBlockSize);
|
||||
Set<String> keyStrSet = new HashSet<String>();
|
||||
byte[][] keys = new byte[NUM_KV][];
|
||||
byte[][] values = new byte[NUM_KV][];
|
||||
|
||||
// Write the HFile
|
||||
{
|
||||
HFile.Writer writer = HFile.getWriterFactory(conf).createWriter(fs,
|
||||
hfilePath, SMALL_BLOCK_SIZE, compr, KeyValue.KEY_COMPARATOR);
|
||||
Random rand = new Random(19231737);
|
||||
|
||||
for (int i = 0; i < NUM_KV; ++i) {
|
||||
byte[] row = TestHFileWriterV2.randomOrderedKey(rand, i);
|
||||
|
||||
// Key will be interpreted by KeyValue.KEY_COMPARATOR
|
||||
byte[] k = KeyValue.createFirstOnRow(row, 0, row.length, row, 0, 0,
|
||||
row, 0, 0).getKey();
|
||||
|
||||
byte[] v = TestHFileWriterV2.randomValue(rand);
|
||||
writer.append(k, v);
|
||||
keys[i] = k;
|
||||
values[i] = v;
|
||||
keyStrSet.add(Bytes.toStringBinary(k));
|
||||
|
||||
if (i > 0) {
|
||||
assertTrue(KeyValue.KEY_COMPARATOR.compare(keys[i - 1],
|
||||
keys[i]) < 0);
|
||||
}
|
||||
}
|
||||
|
||||
writer.close();
|
||||
}
|
||||
|
||||
// Read the HFile
|
||||
HFile.Reader reader = HFile.createReader(fs, hfilePath, blockCache,
|
||||
false, true);
|
||||
assertEquals(expectedNumLevels,
|
||||
reader.getTrailer().getNumDataIndexLevels());
|
||||
|
||||
assertTrue(Bytes.equals(keys[0], reader.getFirstKey()));
|
||||
assertTrue(Bytes.equals(keys[NUM_KV - 1], reader.getLastKey()));
|
||||
LOG.info("Last key: " + Bytes.toStringBinary(keys[NUM_KV - 1]));
|
||||
|
||||
for (boolean pread : new boolean[] { false, true }) {
|
||||
HFileScanner scanner = reader.getScanner(true, pread);
|
||||
for (int i = 0; i < NUM_KV; ++i) {
|
||||
checkSeekTo(keys, scanner, i);
|
||||
checkKeyValue("i=" + i, keys[i], values[i], scanner.getKey(),
|
||||
scanner.getValue());
|
||||
}
|
||||
assertTrue(scanner.seekTo());
|
||||
for (int i = NUM_KV - 1; i >= 0; --i) {
|
||||
checkSeekTo(keys, scanner, i);
|
||||
checkKeyValue("i=" + i, keys[i], values[i], scanner.getKey(),
|
||||
scanner.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
// Manually compute the mid-key and validate it.
|
||||
HFileReaderV2 reader2 = (HFileReaderV2) reader;
|
||||
HFileBlock.FSReader fsReader = reader2.getUncachedBlockReader();
|
||||
|
||||
HFileBlock.BlockIterator iter = fsReader.blockRange(0,
|
||||
reader.getTrailer().getLoadOnOpenDataOffset());
|
||||
HFileBlock block;
|
||||
List<byte[]> blockKeys = new ArrayList<byte[]>();
|
||||
while ((block = iter.nextBlock()) != null) {
|
||||
if (block.getBlockType() != BlockType.LEAF_INDEX)
|
||||
return;
|
||||
ByteBuffer b = block.getBufferReadOnly();
|
||||
int n = b.getInt();
|
||||
// One int for the number of items, and n + 1 for the secondary index.
|
||||
int entriesOffset = Bytes.SIZEOF_INT * (n + 2);
|
||||
|
||||
// Get all the keys from the leaf index block. S
|
||||
for (int i = 0; i < n; ++i) {
|
||||
int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (i + 1));
|
||||
int nextKeyRelOffset = b.getInt(Bytes.SIZEOF_INT * (i + 2));
|
||||
int keyLen = nextKeyRelOffset - keyRelOffset;
|
||||
int keyOffset = b.arrayOffset() + entriesOffset + keyRelOffset +
|
||||
HFileBlockIndex.SECONDARY_INDEX_ENTRY_OVERHEAD;
|
||||
byte[] blockKey = Arrays.copyOfRange(b.array(), keyOffset, keyOffset
|
||||
+ keyLen);
|
||||
String blockKeyStr = Bytes.toString(blockKey);
|
||||
blockKeys.add(blockKey);
|
||||
|
||||
// If the first key of the block is not among the keys written, we
|
||||
// are not parsing the non-root index block format correctly.
|
||||
assertTrue("Invalid block key from leaf-level block: " + blockKeyStr,
|
||||
keyStrSet.contains(blockKeyStr));
|
||||
}
|
||||
}
|
||||
|
||||
// Validate the mid-key.
|
||||
assertEquals(
|
||||
Bytes.toStringBinary(blockKeys.get((blockKeys.size() - 1) / 2)),
|
||||
Bytes.toStringBinary(reader.midkey()));
|
||||
|
||||
assertEquals(UNCOMPRESSED_INDEX_SIZES[testI],
|
||||
reader.getTrailer().getUncompressedDataIndexSize());
|
||||
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void checkSeekTo(byte[][] keys, HFileScanner scanner, int i)
|
||||
throws IOException {
|
||||
assertEquals("Failed to seek to key #" + i + " ("
|
||||
+ Bytes.toStringBinary(keys[i]) + ")", 0, scanner.seekTo(keys[i]));
|
||||
}
|
||||
|
||||
private void assertArrayEqualsBuffer(String msgPrefix, byte[] arr,
|
||||
ByteBuffer buf) {
|
||||
assertEquals(msgPrefix + ": expected " + Bytes.toStringBinary(arr)
|
||||
+ ", actual " + Bytes.toStringBinary(buf), 0, Bytes.compareTo(arr, 0,
|
||||
arr.length, buf.array(), buf.arrayOffset(), buf.limit()));
|
||||
}
|
||||
|
||||
/** Check a key/value pair after it was read by the reader */
|
||||
private void checkKeyValue(String msgPrefix, byte[] expectedKey,
|
||||
byte[] expectedValue, ByteBuffer keyRead, ByteBuffer valueRead) {
|
||||
if (!msgPrefix.isEmpty())
|
||||
msgPrefix += ". ";
|
||||
|
||||
assertArrayEqualsBuffer(msgPrefix + "Invalid key", expectedKey, keyRead);
|
||||
assertArrayEqualsBuffer(msgPrefix + "Invalid value", expectedValue,
|
||||
valueRead);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestHFileReaderV1 {
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL =
|
||||
new HBaseTestingUtility();
|
||||
|
||||
private Configuration conf;
|
||||
private FileSystem fs;
|
||||
|
||||
private static final int N = 1000;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
fs = FileSystem.get(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadingExistingVersion1HFile() throws IOException {
|
||||
URL url = TestHFileReaderV1.class.getResource(
|
||||
"8e8ab58dcf39412da19833fcd8f687ac");
|
||||
Path existingHFilePath = new Path(url.getPath());
|
||||
HFile.Reader reader =
|
||||
HFile.createReader(fs, existingHFilePath, null, false, false);
|
||||
reader.loadFileInfo();
|
||||
FixedFileTrailer trailer = reader.getTrailer();
|
||||
|
||||
assertEquals(N, reader.getEntries());
|
||||
assertEquals(N, trailer.getEntryCount());
|
||||
assertEquals(1, trailer.getVersion());
|
||||
assertEquals(Compression.Algorithm.GZ, trailer.getCompressionCodec());
|
||||
|
||||
for (boolean pread : new boolean[] { false, true }) {
|
||||
int totalDataSize = 0;
|
||||
int n = 0;
|
||||
|
||||
HFileScanner scanner = reader.getScanner(false, pread);
|
||||
assertTrue(scanner.seekTo());
|
||||
do {
|
||||
totalDataSize += scanner.getKey().limit() + scanner.getValue().limit()
|
||||
+ Bytes.SIZEOF_INT * 2;
|
||||
++n;
|
||||
} while (scanner.next());
|
||||
|
||||
// Add magic record sizes, one per data block.
|
||||
totalDataSize += 8 * trailer.getDataIndexCount();
|
||||
|
||||
assertEquals(N, n);
|
||||
assertEquals(trailer.getTotalUncompressedBytes(), totalDataSize);
|
||||
}
|
||||
reader.close();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,256 @@
|
|||
/**
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Testing writing a version 2 {@link HFile}. This is a low-level test written
|
||||
* during the development of {@link HFileWriterV2}.
|
||||
*/
|
||||
public class TestHFileWriterV2 {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestHFileWriterV2.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL =
|
||||
new HBaseTestingUtility();
|
||||
|
||||
private Configuration conf;
|
||||
private FileSystem fs;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
fs = FileSystem.get(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHFileFormatV2() throws IOException {
|
||||
Path hfilePath = new Path(HBaseTestingUtility.getTestDir(),
|
||||
"testHFileFormatV2");
|
||||
|
||||
final Compression.Algorithm COMPRESS_ALGO = Compression.Algorithm.GZ;
|
||||
HFileWriterV2 writer = new HFileWriterV2(conf, fs, hfilePath, 4096,
|
||||
COMPRESS_ALGO, KeyValue.KEY_COMPARATOR);
|
||||
|
||||
long totalKeyLength = 0;
|
||||
long totalValueLength = 0;
|
||||
|
||||
Random rand = new Random(9713312); // Just a fixed seed.
|
||||
|
||||
final int ENTRY_COUNT = 10000;
|
||||
List<byte[]> keys = new ArrayList<byte[]>();
|
||||
List<byte[]> values = new ArrayList<byte[]>();
|
||||
|
||||
for (int i = 0; i < ENTRY_COUNT; ++i) {
|
||||
byte[] keyBytes = randomOrderedKey(rand, i);
|
||||
|
||||
// A random-length random value.
|
||||
byte[] valueBytes = randomValue(rand);
|
||||
writer.append(keyBytes, valueBytes);
|
||||
|
||||
totalKeyLength += keyBytes.length;
|
||||
totalValueLength += valueBytes.length;
|
||||
|
||||
keys.add(keyBytes);
|
||||
values.add(valueBytes);
|
||||
}
|
||||
|
||||
// Add in an arbitrary order. They will be sorted lexicographically by
|
||||
// the key.
|
||||
writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C."));
|
||||
writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow"));
|
||||
writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris"));
|
||||
|
||||
writer.close();
|
||||
|
||||
FSDataInputStream fsdis = fs.open(hfilePath);
|
||||
|
||||
// A "manual" version of a new-format HFile reader. This unit test was
|
||||
// written before the V2 reader was fully implemented.
|
||||
|
||||
long fileSize = fs.getFileStatus(hfilePath).getLen();
|
||||
FixedFileTrailer trailer =
|
||||
FixedFileTrailer.readFromStream(fsdis, fileSize);
|
||||
|
||||
assertEquals(2, trailer.getVersion());
|
||||
assertEquals(ENTRY_COUNT, trailer.getEntryCount());
|
||||
|
||||
HFileBlock.FSReader blockReader =
|
||||
new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize);
|
||||
|
||||
// Counters for the number of key/value pairs and the number of blocks
|
||||
int entriesRead = 0;
|
||||
int blocksRead = 0;
|
||||
|
||||
// Scan blocks the way the reader would scan them
|
||||
fsdis.seek(0);
|
||||
long curBlockPos = 0;
|
||||
while (curBlockPos <= trailer.getLastDataBlockOffset()) {
|
||||
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
|
||||
assertEquals(BlockType.DATA, block.getBlockType());
|
||||
ByteBuffer buf = block.getBufferWithoutHeader();
|
||||
while (buf.hasRemaining()) {
|
||||
int keyLen = buf.getInt();
|
||||
int valueLen = buf.getInt();
|
||||
|
||||
byte[] key = new byte[keyLen];
|
||||
buf.get(key);
|
||||
|
||||
byte[] value = new byte[valueLen];
|
||||
buf.get(value);
|
||||
|
||||
// A brute-force check to see that all keys and values are correct.
|
||||
assertTrue(Bytes.compareTo(key, keys.get(entriesRead)) == 0);
|
||||
assertTrue(Bytes.compareTo(value, values.get(entriesRead)) == 0);
|
||||
|
||||
++entriesRead;
|
||||
}
|
||||
++blocksRead;
|
||||
curBlockPos += block.getOnDiskSizeWithHeader();
|
||||
}
|
||||
LOG.info("Finished reading: entries=" + entriesRead + ", blocksRead="
|
||||
+ blocksRead);
|
||||
assertEquals(ENTRY_COUNT, entriesRead);
|
||||
|
||||
// Meta blocks. We can scan until the load-on-open data offset (which is
|
||||
// the root block index offset in version 2) because we are not testing
|
||||
// intermediate-level index blocks here.
|
||||
|
||||
int metaCounter = 0;
|
||||
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
|
||||
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
|
||||
trailer.getLoadOnOpenDataOffset());
|
||||
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
|
||||
assertEquals(BlockType.META, block.getBlockType());
|
||||
Text t = new Text();
|
||||
block.readInto(t);
|
||||
Text expectedText =
|
||||
(metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text(
|
||||
"Moscow") : new Text("Washington, D.C."));
|
||||
assertEquals(expectedText, t);
|
||||
LOG.info("Read meta block data: " + t);
|
||||
++metaCounter;
|
||||
curBlockPos += block.getOnDiskSizeWithHeader();
|
||||
}
|
||||
|
||||
fsdis.close();
|
||||
}
|
||||
|
||||
// Static stuff used by various HFile v2 unit tests
|
||||
|
||||
private static final String COLUMN_FAMILY_NAME = "_-myColumnFamily-_";
|
||||
private static final int MIN_ROW_OR_QUALIFIER_LENGTH = 64;
|
||||
private static final int MAX_ROW_OR_QUALIFIER_LENGTH = 128;
|
||||
|
||||
/**
|
||||
* Generates a random key that is guaranteed to increase as the given index i
|
||||
* increases. The result consists of a prefix, which is a deterministic
|
||||
* increasing function of i, and a random suffix.
|
||||
*
|
||||
* @param rand
|
||||
* random number generator to use
|
||||
* @param i
|
||||
* @return
|
||||
*/
|
||||
public static byte[] randomOrderedKey(Random rand, int i) {
|
||||
StringBuilder k = new StringBuilder();
|
||||
|
||||
// The fixed-length lexicographically increasing part of the key.
|
||||
for (int bitIndex = 31; bitIndex >= 0; --bitIndex) {
|
||||
if ((i & (1 << bitIndex)) == 0)
|
||||
k.append("a");
|
||||
else
|
||||
k.append("b");
|
||||
}
|
||||
|
||||
// A random-length random suffix of the key.
|
||||
for (int j = 0; j < rand.nextInt(50); ++j)
|
||||
k.append(randomReadableChar(rand));
|
||||
|
||||
byte[] keyBytes = k.toString().getBytes();
|
||||
return keyBytes;
|
||||
}
|
||||
|
||||
public static byte[] randomValue(Random rand) {
|
||||
StringBuilder v = new StringBuilder();
|
||||
for (int j = 0; j < 1 + rand.nextInt(2000); ++j) {
|
||||
v.append((char) (32 + rand.nextInt(95)));
|
||||
}
|
||||
|
||||
byte[] valueBytes = v.toString().getBytes();
|
||||
return valueBytes;
|
||||
}
|
||||
|
||||
public static final char randomReadableChar(Random rand) {
|
||||
int i = rand.nextInt(26 * 2 + 10 + 1);
|
||||
if (i < 26)
|
||||
return (char) ('A' + i);
|
||||
i -= 26;
|
||||
|
||||
if (i < 26)
|
||||
return (char) ('a' + i);
|
||||
i -= 26;
|
||||
|
||||
if (i < 10)
|
||||
return (char) ('0' + i);
|
||||
i -= 10;
|
||||
|
||||
assert i == 0;
|
||||
return '_';
|
||||
}
|
||||
|
||||
public static byte[] randomRowOrQualifier(Random rand) {
|
||||
StringBuilder field = new StringBuilder();
|
||||
int fieldLen = MIN_ROW_OR_QUALIFIER_LENGTH
|
||||
+ rand.nextInt(MAX_ROW_OR_QUALIFIER_LENGTH
|
||||
- MIN_ROW_OR_QUALIFIER_LENGTH + 1);
|
||||
for (int i = 0; i < fieldLen; ++i)
|
||||
field.append(randomReadableChar(rand));
|
||||
return field.toString().getBytes();
|
||||
}
|
||||
|
||||
public static KeyValue randomKeyValue(Random rand) {
|
||||
return new KeyValue(randomRowOrQualifier(rand),
|
||||
COLUMN_FAMILY_NAME.getBytes(), randomRowOrQualifier(rand),
|
||||
randomValue(rand));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,353 @@
|
|||
/*
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||
import org.apache.hadoop.hbase.util.ByteBloomFilter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CompoundBloomFilter;
|
||||
import org.apache.hadoop.hbase.util.CompoundBloomFilterBase;
|
||||
import org.apache.hadoop.hbase.util.CompoundBloomFilterWriter;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests writing Bloom filter blocks in the same part of the file as data
|
||||
* blocks.
|
||||
*/
|
||||
public class TestCompoundBloomFilter {
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL =
|
||||
new HBaseTestingUtility();
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
TestCompoundBloomFilter.class);
|
||||
|
||||
private static final int NUM_TESTS = 9;
|
||||
private static final BloomType BLOOM_TYPES[] = { BloomType.ROW,
|
||||
BloomType.ROW, BloomType.ROWCOL, BloomType.ROWCOL, BloomType.ROW,
|
||||
BloomType.ROWCOL, BloomType.ROWCOL, BloomType.ROWCOL, BloomType.ROW };
|
||||
|
||||
private static final int NUM_KV[];
|
||||
static {
|
||||
final int N = 10000; // Only used in initialization.
|
||||
NUM_KV = new int[] { 21870, N, N, N, N, 1000, N, 7500, 7500};
|
||||
assert NUM_KV.length == NUM_TESTS;
|
||||
}
|
||||
|
||||
private static final int BLOCK_SIZES[];
|
||||
static {
|
||||
final int blkSize = 65536;
|
||||
BLOCK_SIZES = new int[] { 512, 1000, blkSize, blkSize, blkSize, 128, 300,
|
||||
blkSize, blkSize };
|
||||
assert BLOCK_SIZES.length == NUM_TESTS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Be careful not to specify too high a Bloom filter block size, otherwise
|
||||
* there will only be one oversized chunk and the observed false positive
|
||||
* rate will be too low.
|
||||
*/
|
||||
private static final int BLOOM_BLOCK_SIZES[] = { 1000, 4096, 4096, 4096,
|
||||
8192, 128, 1024, 600, 600 };
|
||||
static { assert BLOOM_BLOCK_SIZES.length == NUM_TESTS; }
|
||||
|
||||
private static final double TARGET_ERROR_RATES[] = { 0.025, 0.01, 0.015,
|
||||
0.01, 0.03, 0.01, 0.01, 0.07, 0.07 };
|
||||
static { assert TARGET_ERROR_RATES.length == NUM_TESTS; }
|
||||
|
||||
/** A false positive rate that is obviously too high. */
|
||||
private static final double TOO_HIGH_ERROR_RATE;
|
||||
static {
|
||||
double m = 0;
|
||||
for (double errorRate : TARGET_ERROR_RATES)
|
||||
m = Math.max(m, errorRate);
|
||||
TOO_HIGH_ERROR_RATE = m + 0.03;
|
||||
}
|
||||
|
||||
private static Configuration conf;
|
||||
private FileSystem fs;
|
||||
private BlockCache blockCache;
|
||||
|
||||
/** A message of the form "in test#<number>:" to include in logging. */
|
||||
private String testIdMsg;
|
||||
|
||||
private static final int GENERATION_SEED = 2319;
|
||||
private static final int EVALUATION_SEED = 135;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
|
||||
// This test requires the most recent HFile format (i.e. v2).
|
||||
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
|
||||
|
||||
fs = FileSystem.get(conf);
|
||||
|
||||
blockCache = StoreFile.getBlockCache(conf);
|
||||
assertNotNull(blockCache);
|
||||
}
|
||||
|
||||
private List<KeyValue> createSortedKeyValues(Random rand, int n) {
|
||||
List<KeyValue> kvList = new ArrayList<KeyValue>(n);
|
||||
for (int i = 0; i < n; ++i)
|
||||
kvList.add(TestHFileWriterV2.randomKeyValue(rand));
|
||||
Collections.sort(kvList, KeyValue.COMPARATOR);
|
||||
return kvList;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompoundBloomFilter() throws IOException {
|
||||
conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
|
||||
for (int t = 0; t < NUM_TESTS; ++t) {
|
||||
conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE,
|
||||
(float) TARGET_ERROR_RATES[t]);
|
||||
|
||||
testIdMsg = "in test #" + t + ":";
|
||||
Random generationRand = new Random(GENERATION_SEED);
|
||||
List<KeyValue> kvs = createSortedKeyValues(generationRand, NUM_KV[t]);
|
||||
BloomType bt = BLOOM_TYPES[t];
|
||||
Path sfPath = writeStoreFile(t, bt, kvs);
|
||||
readStoreFile(t, bt, kvs, sfPath);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the false positive ratio by computing its z-value and comparing
|
||||
* it to the provided threshold.
|
||||
*
|
||||
* @param falsePosRate experimental positive rate
|
||||
* @param nTrials the number of calls to
|
||||
* {@link StoreFile.Reader#shouldSeek(Scan, java.util.SortedSet)}.
|
||||
* @param zValueBoundary z-value boundary, positive for an upper bound and
|
||||
* negative for a lower bound
|
||||
* @param cbf the compound Bloom filter we are using
|
||||
* @param additionalMsg additional message to include in log output and
|
||||
* assertion failures
|
||||
*/
|
||||
private void validateFalsePosRate(double falsePosRate, int nTrials,
|
||||
double zValueBoundary, CompoundBloomFilter cbf, String additionalMsg) {
|
||||
double p = BloomFilterFactory.getErrorRate(conf);
|
||||
double zValue = (falsePosRate - p) / Math.sqrt(p * (1 - p) / nTrials);
|
||||
|
||||
String assortedStatsStr = " (targetErrorRate=" + p + ", falsePosRate="
|
||||
+ falsePosRate + ", nTrials=" + nTrials + ")";
|
||||
LOG.info("z-value is " + zValue + assortedStatsStr);
|
||||
|
||||
boolean isUpperBound = zValueBoundary > 0;
|
||||
|
||||
if (isUpperBound && zValue > zValueBoundary ||
|
||||
!isUpperBound && zValue < zValueBoundary) {
|
||||
String errorMsg = "False positive rate z-value " + zValue + " is "
|
||||
+ (isUpperBound ? "higher" : "lower") + " than " + zValueBoundary
|
||||
+ assortedStatsStr + ". Per-chunk stats:\n"
|
||||
+ cbf.formatTestingStats();
|
||||
fail(errorMsg + additionalMsg);
|
||||
}
|
||||
}
|
||||
|
||||
private void readStoreFile(int t, BloomType bt, List<KeyValue> kvs,
|
||||
Path sfPath) throws IOException {
|
||||
StoreFile sf = new StoreFile(fs, sfPath, true, conf, bt, false);
|
||||
StoreFile.Reader r = sf.createReader();
|
||||
final boolean pread = true; // does not really matter
|
||||
StoreFileScanner scanner = r.getStoreFileScanner(true, pread);
|
||||
|
||||
{
|
||||
// Test for false negatives (not allowed).
|
||||
int numChecked = 0;
|
||||
for (KeyValue kv : kvs) {
|
||||
byte[] row = kv.getRow();
|
||||
boolean present = isInBloom(scanner, row, kv.getQualifier());
|
||||
assertTrue(testIdMsg + " Bloom filter false negative on row "
|
||||
+ Bytes.toStringBinary(row) + " after " + numChecked
|
||||
+ " successful checks", present);
|
||||
++numChecked;
|
||||
}
|
||||
}
|
||||
|
||||
// Test for false positives (some percentage allowed). We test in two modes:
|
||||
// "fake lookup" which ignores the key distribution, and production mode.
|
||||
for (boolean fakeLookupEnabled : new boolean[] { true, false }) {
|
||||
ByteBloomFilter.setFakeLookupMode(fakeLookupEnabled);
|
||||
try {
|
||||
String fakeLookupModeStr = ", fake lookup is " + (fakeLookupEnabled ?
|
||||
"enabled" : "disabled");
|
||||
CompoundBloomFilter cbf = (CompoundBloomFilter) r.getBloomFilter();
|
||||
cbf.enableTestingStats();
|
||||
int numFalsePos = 0;
|
||||
Random rand = new Random(EVALUATION_SEED);
|
||||
int nTrials = NUM_KV[t] * 10;
|
||||
for (int i = 0; i < nTrials; ++i) {
|
||||
byte[] query = TestHFileWriterV2.randomRowOrQualifier(rand);
|
||||
if (isInBloom(scanner, query, bt, rand)) {
|
||||
numFalsePos += 1;
|
||||
}
|
||||
}
|
||||
double falsePosRate = numFalsePos * 1.0 / nTrials;
|
||||
LOG.debug(String.format(testIdMsg
|
||||
+ " False positives: %d out of %d (%f)",
|
||||
numFalsePos, nTrials, falsePosRate) + fakeLookupModeStr);
|
||||
|
||||
// Check for obvious Bloom filter crashes.
|
||||
assertTrue("False positive is too high: " + falsePosRate + " (greater "
|
||||
+ "than " + TOO_HIGH_ERROR_RATE + ")" + fakeLookupModeStr,
|
||||
falsePosRate < TOO_HIGH_ERROR_RATE);
|
||||
|
||||
// Now a more precise check to see if the false positive rate is not
|
||||
// too high. The reason we use a relaxed restriction for the real-world
|
||||
// case as opposed to the "fake lookup" case is that our hash functions
|
||||
// are not completely independent.
|
||||
|
||||
double maxZValue = fakeLookupEnabled ? 1.96 : 2.5;
|
||||
validateFalsePosRate(falsePosRate, nTrials, maxZValue, cbf,
|
||||
fakeLookupModeStr);
|
||||
|
||||
// For checking the lower bound we need to eliminate the last chunk,
|
||||
// because it is frequently smaller and the false positive rate in it
|
||||
// is too low. This does not help if there is only one under-sized
|
||||
// chunk, though.
|
||||
int nChunks = cbf.getNumChunks();
|
||||
if (nChunks > 1) {
|
||||
numFalsePos -= cbf.getNumPositivesForTesting(nChunks - 1);
|
||||
nTrials -= cbf.getNumQueriesForTesting(nChunks - 1);
|
||||
falsePosRate = numFalsePos * 1.0 / nTrials;
|
||||
LOG.info(testIdMsg + " False positive rate without last chunk is " +
|
||||
falsePosRate + fakeLookupModeStr);
|
||||
}
|
||||
|
||||
validateFalsePosRate(falsePosRate, nTrials, -2.58, cbf,
|
||||
fakeLookupModeStr);
|
||||
} finally {
|
||||
ByteBloomFilter.setFakeLookupMode(false);
|
||||
}
|
||||
}
|
||||
|
||||
r.close();
|
||||
}
|
||||
|
||||
private boolean isInBloom(StoreFileScanner scanner, byte[] row, BloomType bt,
|
||||
Random rand) {
|
||||
return isInBloom(scanner, row,
|
||||
TestHFileWriterV2.randomRowOrQualifier(rand));
|
||||
}
|
||||
|
||||
private boolean isInBloom(StoreFileScanner scanner, byte[] row,
|
||||
byte[] qualifier) {
|
||||
Scan scan = new Scan(row, row);
|
||||
TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
columns.add(qualifier);
|
||||
return scanner.shouldSeek(scan, columns);
|
||||
}
|
||||
|
||||
private Path writeStoreFile(int t, BloomType bt, List<KeyValue> kvs)
|
||||
throws IOException {
|
||||
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
|
||||
BLOOM_BLOCK_SIZES[t]);
|
||||
conf.setBoolean(HFile.CACHE_BLOCKS_ON_WRITE_KEY, true);
|
||||
|
||||
StoreFile.Writer w = StoreFile.createWriter(fs,
|
||||
HBaseTestingUtility.getTestDir(), BLOCK_SIZES[t], null, null, conf,
|
||||
bt, 0);
|
||||
|
||||
assertTrue(w.hasBloom());
|
||||
assertTrue(w.getBloomWriter() instanceof CompoundBloomFilterWriter);
|
||||
CompoundBloomFilterWriter cbbf =
|
||||
(CompoundBloomFilterWriter) w.getBloomWriter();
|
||||
|
||||
int keyCount = 0;
|
||||
KeyValue prev = null;
|
||||
LOG.debug("Total keys/values to insert: " + kvs.size());
|
||||
for (KeyValue kv : kvs) {
|
||||
w.append(kv);
|
||||
|
||||
// Validate the key count in the Bloom filter.
|
||||
boolean newKey = true;
|
||||
if (prev != null) {
|
||||
newKey = !(bt == BloomType.ROW ? KeyValue.COMPARATOR.matchingRows(kv,
|
||||
prev) : KeyValue.COMPARATOR.matchingRowColumn(kv, prev));
|
||||
}
|
||||
if (newKey)
|
||||
++keyCount;
|
||||
assertEquals(keyCount, cbbf.getKeyCount());
|
||||
|
||||
prev = kv;
|
||||
}
|
||||
w.close();
|
||||
|
||||
return w.getPath();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompoundBloomSizing() {
|
||||
int bloomBlockByteSize = 4096;
|
||||
int bloomBlockBitSize = bloomBlockByteSize * 8;
|
||||
double targetErrorRate = 0.01;
|
||||
long maxKeysPerChunk = ByteBloomFilter.idealMaxKeys(bloomBlockBitSize,
|
||||
targetErrorRate);
|
||||
|
||||
long bloomSize1 = bloomBlockByteSize * 8;
|
||||
long bloomSize2 = ByteBloomFilter.computeBitSize(maxKeysPerChunk,
|
||||
targetErrorRate);
|
||||
|
||||
double bloomSizeRatio = (bloomSize2 * 1.0 / bloomSize1);
|
||||
assertTrue(Math.abs(bloomSizeRatio - 0.9999) < 0.0001);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateKey() {
|
||||
CompoundBloomFilterBase cbfb = new CompoundBloomFilterBase();
|
||||
byte[] row = "myRow".getBytes();
|
||||
byte[] qualifier = "myQualifier".getBytes();
|
||||
byte[] rowKey = cbfb.createBloomKey(row, 0, row.length,
|
||||
row, 0, 0);
|
||||
byte[] rowColKey = cbfb.createBloomKey(row, 0, row.length,
|
||||
qualifier, 0, qualifier.length);
|
||||
KeyValue rowKV = KeyValue.createKeyValueFromKey(rowKey);
|
||||
KeyValue rowColKV = KeyValue.createKeyValueFromKey(rowColKey);
|
||||
assertEquals(rowKV.getTimestamp(), rowColKV.getTimestamp());
|
||||
assertEquals(Bytes.toStringBinary(rowKV.getRow()),
|
||||
Bytes.toStringBinary(rowColKV.getRow()));
|
||||
assertEquals(0, rowKV.getQualifier().length);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Copyright 2011 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestIdLock {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestIdLock.class);
|
||||
|
||||
private static final int NUM_IDS = 16;
|
||||
private static final int NUM_THREADS = 128;
|
||||
private static final int NUM_SECONDS = 20;
|
||||
|
||||
private IdLock idLock = new IdLock();
|
||||
|
||||
private Map<Long, String> idOwner = new ConcurrentHashMap<Long, String>();
|
||||
|
||||
private class IdLockTestThread implements Callable<Boolean> {
|
||||
|
||||
private String clientId;
|
||||
|
||||
public IdLockTestThread(String clientId) {
|
||||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
Thread.currentThread().setName(clientId);
|
||||
Random rand = new Random();
|
||||
long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000;
|
||||
while (System.currentTimeMillis() < endTime) {
|
||||
long id = rand.nextInt(NUM_IDS);
|
||||
|
||||
LOG.info(clientId + " is waiting for id " + id);
|
||||
IdLock.Entry lockEntry = idLock.getLockEntry(id);
|
||||
try {
|
||||
int sleepMs = 1 + rand.nextInt(4);
|
||||
String owner = idOwner.get(id);
|
||||
if (owner != null) {
|
||||
LOG.error("Id " + id + " already taken by " + owner + ", "
|
||||
+ clientId + " failed");
|
||||
return false;
|
||||
}
|
||||
|
||||
idOwner.put(id, clientId);
|
||||
LOG.info(clientId + " took id " + id + ", sleeping for " +
|
||||
sleepMs + "ms");
|
||||
Thread.sleep(sleepMs);
|
||||
LOG.info(clientId + " is releasing id " + id);
|
||||
idOwner.remove(id);
|
||||
|
||||
} finally {
|
||||
idLock.releaseLockEntry(lockEntry);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleClients() throws Exception {
|
||||
ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
|
||||
try {
|
||||
ExecutorCompletionService<Boolean> ecs =
|
||||
new ExecutorCompletionService<Boolean>(exec);
|
||||
for (int i = 0; i < NUM_THREADS; ++i)
|
||||
ecs.submit(new IdLockTestThread("client_" + i));
|
||||
for (int i = 0; i < NUM_THREADS; ++i) {
|
||||
Future<Boolean> result = ecs.take();
|
||||
assertTrue(result.get());
|
||||
}
|
||||
idLock.assertMapEmpty();
|
||||
} finally {
|
||||
exec.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue