HBASE-3857 more new files

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1153645 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-08-03 20:25:28 +00:00
parent 8e2b477566
commit 3eadbe11a1
13 changed files with 4815 additions and 0 deletions

View File

@ -0,0 +1,74 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.OutputStream;
/**
* An output stream that writes to two streams on each operation. Does not
* attempt to handle exceptions gracefully. If any operation other than
* {@link #close()} fails on the first stream, it is not called on the second
* stream.
*/
public class DoubleOutputStream extends OutputStream {
private OutputStream out1;
private OutputStream out2;
public DoubleOutputStream(OutputStream out1, OutputStream out2) {
this.out1 = out1;
this.out2 = out2;
}
@Override
public void write(int b) throws IOException {
out1.write(b);
out2.write(b);
}
@Override
public void write(byte b[]) throws IOException {
out1.write(b, 0, b.length);
out2.write(b, 0, b.length);
}
@Override
public void write(byte b[], int off, int len) throws IOException {
out1.write(b, off, len);
out2.write(b, off, len);
}
@Override
public void flush() throws IOException {
out1.flush();
out2.flush();
}
@Override
public void close() throws IOException {
try {
out1.close();
} finally {
// Make sure we at least attempt to close both streams.
out2.close();
}
}
}

View File

@ -0,0 +1,354 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.io.RawComparator;
/**
* Common functionality needed by all versions of {@link HFile} readers.
*/
public abstract class AbstractHFileReader implements HFile.Reader {
private static final Log LOG = LogFactory.getLog(AbstractHFileReader.class);
/** Filesystem-level block reader for this HFile format version. */
protected HFileBlock.FSReader fsBlockReader;
/** Stream to read from. */
protected FSDataInputStream istream;
/**
* True if we should close the input stream when done. We don't close it if we
* didn't open it.
*/
protected final boolean closeIStream;
/** Data block index reader keeping the root data index in memory */
protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
/** Meta block index reader -- always single level */
protected HFileBlockIndex.BlockIndexReader metaBlockIndexReader;
protected final FixedFileTrailer trailer;
/** Filled when we read in the trailer. */
protected final Compression.Algorithm compressAlgo;
/** Last key in the file. Filled in when we read in the file info */
protected byte [] lastKey = null;
/** Average key length read from file info */
protected int avgKeyLen = -1;
/** Average value length read from file info */
protected int avgValueLen = -1;
/** Key comparator */
protected RawComparator<byte []> comparator;
/** Size of this file. */
protected final long fileSize;
/** Block cache to use. */
protected final BlockCache blockCache;
protected AtomicLong cacheHits = new AtomicLong();
protected AtomicLong blockLoads = new AtomicLong();
protected AtomicLong metaLoads = new AtomicLong();
/**
* Whether file is from in-memory store (comes from column family
* configuration).
*/
protected boolean inMemory = false;
/**
* Whether blocks of file should be evicted from the block cache when the
* file is being closed
*/
protected final boolean evictOnClose;
/** Path of file */
protected final Path path;
/** File name to be used for block names */
protected final String name;
protected FileInfo fileInfo;
/** Prefix of the form cf.<column_family_name> for statistics counters. */
private final String cfStatsPrefix;
protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final long fileSize,
final boolean closeIStream,
final BlockCache blockCache, final boolean inMemory,
final boolean evictOnClose) {
this.trailer = trailer;
this.compressAlgo = trailer.getCompressionCodec();
this.blockCache = blockCache;
this.fileSize = fileSize;
this.istream = fsdis;
this.closeIStream = closeIStream;
this.inMemory = inMemory;
this.evictOnClose = evictOnClose;
this.path = path;
this.name = path.getName();
cfStatsPrefix = "cf." + parseCfNameFromPath(path.toString());
}
@SuppressWarnings("serial")
public static class BlockIndexNotLoadedException
extends IllegalStateException {
public BlockIndexNotLoadedException() {
// Add a message in case anyone relies on it as opposed to class name.
super("Block index not loaded");
}
}
protected String toStringFirstKey() {
return KeyValue.keyToString(getFirstKey());
}
protected String toStringLastKey() {
return KeyValue.keyToString(getLastKey());
}
/**
* Parse the HFile path to figure out which table and column family
* it belongs to. This is used to maintain read statistics on a
* per-column-family basis.
*
* @param path HFile path name
*/
public static String parseCfNameFromPath(String path) {
String splits[] = path.split("/");
if (splits.length < 2) {
LOG.warn("Could not determine the table and column family of the " +
"HFile path " + path);
return "unknown";
}
return splits[splits.length - 2];
}
public abstract boolean isFileInfoLoaded();
@Override
public String toString() {
return "reader=" + path.toString() +
(!isFileInfoLoaded()? "":
", compression=" + compressAlgo.getName() +
", inMemory=" + inMemory +
", firstKey=" + toStringFirstKey() +
", lastKey=" + toStringLastKey()) +
", avgKeyLen=" + avgKeyLen +
", avgValueLen=" + avgValueLen +
", entries=" + trailer.getEntryCount() +
", length=" + fileSize;
}
@Override
public long length() {
return fileSize;
}
/**
* Create a Scanner on this file. No seeks or reads are done on creation. Call
* {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
* nothing to clean up in a Scanner. Letting go of your references to the
* scanner is sufficient. NOTE: Do not use this overload of getScanner for
* compactions.
*
* @param cacheBlocks True if we should cache blocks read in by this scanner.
* @param pread Use positional read rather than seek+read if true (pread is
* better for random reads, seek+read is better scanning).
* @return Scanner on this file.
*/
@Override
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
return getScanner(cacheBlocks, pread, false);
}
/**
* @return the first key in the file. May be null if file has no entries. Note
* that this is not the first row key, but rather the byte form of the
* first KeyValue.
*/
@Override
public byte [] getFirstKey() {
if (dataBlockIndexReader == null) {
throw new BlockIndexNotLoadedException();
}
return dataBlockIndexReader.isEmpty() ? null
: dataBlockIndexReader.getRootBlockKey(0);
}
/**
* TODO left from {@HFile} version 1: move this to StoreFile after Ryan's
* patch goes in to eliminate {@link KeyValue} here.
*
* @return the first row key, or null if the file is empty.
*/
@Override
public byte[] getFirstRowKey() {
byte[] firstKey = getFirstKey();
if (firstKey == null)
return null;
return KeyValue.createKeyValueFromKey(firstKey).getRow();
}
/**
* TODO left from {@HFile} version 1: move this to StoreFile after
* Ryan's patch goes in to eliminate {@link KeyValue} here.
*
* @return the last row key, or null if the file is empty.
*/
@Override
public byte[] getLastRowKey() {
byte[] lastKey = getLastKey();
if (lastKey == null)
return null;
return KeyValue.createKeyValueFromKey(lastKey).getRow();
}
/** @return number of KV entries in this HFile */
@Override
public long getEntries() {
return trailer.getEntryCount();
}
/** @return comparator */
@Override
public RawComparator<byte []> getComparator() {
return comparator;
}
/** @return compression algorithm */
@Override
public Compression.Algorithm getCompressionAlgorithm() {
return compressAlgo;
}
/**
* @return the total heap size of data and meta block indexes in bytes. Does
* not take into account non-root blocks of a multilevel data index.
*/
public long indexSize() {
return (dataBlockIndexReader != null ? dataBlockIndexReader.heapSize() : 0)
+ ((metaBlockIndexReader != null) ? metaBlockIndexReader.heapSize()
: 0);
}
@Override
public String getName() {
return name;
}
@Override
public HFileBlockIndex.BlockIndexReader getDataBlockIndexReader() {
return dataBlockIndexReader;
}
@Override
public String getColumnFamilyName() {
return cfStatsPrefix;
}
@Override
public FixedFileTrailer getTrailer() {
return trailer;
}
@Override
public FileInfo loadFileInfo() throws IOException {
return fileInfo;
}
/**
* An exception thrown when an operation requiring a scanner to be seeked
* is invoked on a scanner that is not seeked.
*/
@SuppressWarnings("serial")
public static class NotSeekedException extends IllegalStateException {
public NotSeekedException() {
super("Not seeked to a key/value");
}
}
protected static abstract class Scanner implements HFileScanner {
protected HFile.Reader reader;
protected ByteBuffer blockBuffer;
protected boolean cacheBlocks;
protected final boolean pread;
protected final boolean isCompaction;
protected int currKeyLen;
protected int currValueLen;
protected int blockFetches;
public Scanner(final HFile.Reader reader, final boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
this.reader = reader;
this.cacheBlocks = cacheBlocks;
this.pread = pread;
this.isCompaction = isCompaction;
}
@Override
public Reader getReader() {
return reader;
}
@Override
public boolean isSeeked(){
return blockBuffer != null;
}
@Override
public String toString() {
return "HFileScanner for reader " + String.valueOf(reader);
}
protected void assertSeeked() {
if (!isSeeked())
throw new NotSeekedException();
}
}
/** For testing */
HFileBlock.FSReader getUncachedBlockReader() {
return fsBlockReader;
}
}

View File

@ -0,0 +1,287 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
/**
* Common functionality needed by all versions of {@link HFile} writers.
*/
public abstract class AbstractHFileWriter implements HFile.Writer {
/** Key previously appended. Becomes the last key in the file. */
protected byte[] lastKeyBuffer = null;
protected int lastKeyOffset = -1;
protected int lastKeyLength = -1;
/** FileSystem stream to write into. */
protected FSDataOutputStream outputStream;
/** True if we opened the <code>outputStream</code> (and so will close it). */
protected final boolean closeOutputStream;
/** A "file info" block: a key-value map of file-wide metadata. */
protected FileInfo fileInfo = new HFile.FileInfo();
/** Number of uncompressed bytes we allow per block. */
protected final int blockSize;
/** Total # of key/value entries, i.e. how many times add() was called. */
protected long entryCount = 0;
/** Used for calculating the average key length. */
protected long totalKeyLength = 0;
/** Used for calculating the average value length. */
protected long totalValueLength = 0;
/** Total uncompressed bytes, maybe calculate a compression ratio later. */
protected long totalUncompressedBytes = 0;
/** Key comparator. Used to ensure we write in order. */
protected final RawComparator<byte[]> comparator;
/** Meta block names. */
protected List<byte[]> metaNames = new ArrayList<byte[]>();
/** {@link Writable}s representing meta block data. */
protected List<Writable> metaData = new ArrayList<Writable>();
/** The compression algorithm used. NONE if no compression. */
protected final Compression.Algorithm compressAlgo;
/** First key in a block. */
protected byte[] firstKeyInBlock = null;
/** May be null if we were passed a stream. */
protected final Path path;
/** Whether to cache key/value data blocks on write */
protected final boolean cacheDataBlocksOnWrite;
/** Whether to cache non-root index blocks on write */
protected final boolean cacheIndexBlocksOnWrite;
/** Block cache to optionally fill on write. */
protected BlockCache blockCache;
/** Configuration used for block cache initialization */
private Configuration conf;
/**
* Name for this object used when logging or in toString. Is either
* the result of a toString on stream or else toString of passed file Path.
*/
protected final String name;
public AbstractHFileWriter(Configuration conf,
FSDataOutputStream outputStream, Path path, int blockSize,
Compression.Algorithm compressAlgo, KeyComparator comparator) {
this.outputStream = outputStream;
this.path = path;
this.name = path != null ? path.getName() : outputStream.toString();
this.blockSize = blockSize;
this.compressAlgo = compressAlgo == null
? HFile.DEFAULT_COMPRESSION_ALGORITHM : compressAlgo;
this.comparator = comparator != null ? comparator
: Bytes.BYTES_RAWCOMPARATOR;
closeOutputStream = path != null;
cacheDataBlocksOnWrite = conf.getBoolean(HFile.CACHE_BLOCKS_ON_WRITE_KEY,
false);
cacheIndexBlocksOnWrite = HFileBlockIndex.shouldCacheOnWrite(conf);
this.conf = conf;
if (cacheDataBlocksOnWrite || cacheIndexBlocksOnWrite)
initBlockCache();
}
/**
* Add last bits of metadata to file info before it is written out.
*/
protected void finishFileInfo() throws IOException {
if (lastKeyBuffer != null) {
// Make a copy. The copy is stuffed into HMapWritable. Needs a clean
// byte buffer. Won't take a tuple.
fileInfo.append(FileInfo.LASTKEY, Arrays.copyOfRange(lastKeyBuffer,
lastKeyOffset, lastKeyOffset + lastKeyLength), false);
}
// Average key length.
int avgKeyLen =
entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
// Average value length.
int avgValueLen =
entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
}
/**
* Add to the file info. All added key/value pairs can be obtained using
* {@link HFile.Reader#loadFileInfo()}.
*
* @param k Key
* @param v Value
* @throws IOException in case the key or the value are invalid
*/
@Override
public void appendFileInfo(final byte[] k, final byte[] v)
throws IOException {
fileInfo.append(k, v, true);
}
/**
* Sets the file info offset in the trailer, finishes up populating fields in
* the file info, and writes the file info into the given data output. The
* reason the data output is not always {@link #outputStream} is that we store
* file info as a block in version 2.
*
* @param trailer fixed file trailer
* @param out the data output to write the file info to
* @throws IOException
*/
protected final void writeFileInfo(FixedFileTrailer trailer, DataOutput out)
throws IOException {
trailer.setFileInfoOffset(outputStream.getPos());
finishFileInfo();
fileInfo.write(out);
}
/**
* Checks that the given key does not violate the key order.
*
* @param key Key to check.
* @return true if the key is duplicate
* @throws IOException if the key or the key order is wrong
*/
protected boolean checkKey(final byte[] key, final int offset,
final int length) throws IOException {
boolean isDuplicateKey = false;
if (key == null || length <= 0) {
throw new IOException("Key cannot be null or empty");
}
if (length > HFile.MAXIMUM_KEY_LENGTH) {
throw new IOException("Key length " + length + " > "
+ HFile.MAXIMUM_KEY_LENGTH);
}
if (lastKeyBuffer != null) {
int keyComp = comparator.compare(lastKeyBuffer, lastKeyOffset,
lastKeyLength, key, offset, length);
if (keyComp > 0) {
throw new IOException("Added a key not lexically larger than"
+ " previous key="
+ Bytes.toStringBinary(key, offset, length)
+ ", lastkey="
+ Bytes.toStringBinary(lastKeyBuffer, lastKeyOffset,
lastKeyLength));
} else if (keyComp == 0) {
isDuplicateKey = true;
}
}
return isDuplicateKey;
}
/** Checks the given value for validity. */
protected void checkValue(final byte[] value, final int offset,
final int length) throws IOException {
if (value == null) {
throw new IOException("Value cannot be null");
}
}
/**
* @return Path or null if we were passed a stream rather than a Path.
*/
@Override
public Path getPath() {
return path;
}
@Override
public String toString() {
return "writer=" + (path != null ? path.toString() : null) + ", name="
+ name + ", compression=" + compressAlgo.getName();
}
/**
* Sets remaining trailer fields, writes the trailer to disk, and optionally
* closes the output stream.
*/
protected void finishClose(FixedFileTrailer trailer) throws IOException {
trailer.setMetaIndexCount(metaNames.size());
trailer.setTotalUncompressedBytes(totalUncompressedBytes);
trailer.setEntryCount(entryCount);
trailer.setCompressionCodec(compressAlgo);
trailer.serialize(outputStream);
if (closeOutputStream) {
outputStream.close();
outputStream = null;
}
}
public static Compression.Algorithm compressionByName(String algoName) {
if (algoName == null)
return HFile.DEFAULT_COMPRESSION_ALGORITHM;
return Compression.getCompressionAlgorithmByName(algoName);
}
/** A helper method to create HFile output streams in constructors */
protected static FSDataOutputStream createOutputStream(Configuration conf,
FileSystem fs, Path path) throws IOException {
return fs.create(path, FsPermission.getDefault(), true,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), fs.getDefaultBlockSize(),
null);
}
/** Initializes the block cache to use for cache-on-write */
protected void initBlockCache() {
if (blockCache == null) {
blockCache = StoreFile.getBlockCache(conf);
conf = null; // This is all we need configuration for.
}
}
}

View File

@ -0,0 +1,168 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Various types of {@link HFile} blocks. Ordinal values of these enum constants
* must not be relied upon. The values in the enum appear in the order they
* appear in a version 2 {@link HFile}.
*/
public enum BlockType {
// Scanned block section
/** Data block, both versions */
DATA("DATABLK*"),
/** Version 2 leaf index block. Appears in the data block section */
LEAF_INDEX("IDXLEAF2"),
/** Bloom filter block, version 2 */
BLOOM_CHUNK("BLMFBLK2"),
// Non-scanned block section
/** Meta blocks */
META("METABLKc"),
/** Intermediate-level version 2 index in the non-data block section */
INTERMEDIATE_INDEX("IDXINTE2"),
// Load-on-open section.
/** Root index block, also used for the single-level meta index, version 2 */
ROOT_INDEX("IDXROOT2"),
/** File info, version 2 */
FILE_INFO("FILEINF2"),
/** Bloom filter metadata, version 2 */
BLOOM_META("BLMFMET2"),
// Trailer
/** Fixed file trailer, both versions (always just a magic string) */
TRAILER("TRABLK\"$"),
// Legacy blocks
/** Block index magic string in version 1 */
INDEX_V1("IDXBLK)+");
public static final int MAGIC_LENGTH = 8;
private final byte[] magic;
private BlockType(String magicStr) {
magic = Bytes.toBytes(magicStr);
assert magic.length == MAGIC_LENGTH;
}
public void writeToStream(OutputStream out) throws IOException {
out.write(magic);
}
public void write(DataOutput out) throws IOException {
out.write(magic);
}
public void write(ByteBuffer buf) {
buf.put(magic);
}
public static BlockType parse(byte[] buf, int offset, int length)
throws IOException {
if (length != MAGIC_LENGTH) {
throw new IOException("Magic record of invalid length: "
+ Bytes.toStringBinary(buf, offset, length));
}
for (BlockType blockType : values())
if (Bytes.compareTo(blockType.magic, 0, MAGIC_LENGTH, buf, offset,
MAGIC_LENGTH) == 0)
return blockType;
throw new IOException("Invalid HFile block magic: "
+ Bytes.toStringBinary(buf, offset, MAGIC_LENGTH));
}
public static BlockType read(DataInputStream in) throws IOException {
byte[] buf = new byte[MAGIC_LENGTH];
in.readFully(buf);
return parse(buf, 0, buf.length);
}
public static BlockType read(ByteBuffer buf) throws IOException {
BlockType blockType = parse(buf.array(),
buf.arrayOffset() + buf.position(),
Math.min(buf.limit() - buf.position(), MAGIC_LENGTH));
// If we got here, we have read exactly MAGIC_LENGTH bytes.
buf.position(buf.position() + MAGIC_LENGTH);
return blockType;
}
/**
* Put the magic record out to the specified byte array position.
*
* @param bytes the byte array
* @param offset position in the array
* @return incremented offset
*/
public int put(byte[] bytes, int offset) {
System.arraycopy(magic, 0, bytes, offset, MAGIC_LENGTH);
return offset + MAGIC_LENGTH;
}
/**
* Reads a magic record of the length {@link #MAGIC_LENGTH} from the given
* stream and expects it to match this block type.
*/
public void readAndCheck(DataInputStream in) throws IOException {
byte[] buf = new byte[MAGIC_LENGTH];
in.readFully(buf);
if (Bytes.compareTo(buf, magic) != 0) {
throw new IOException("Invalid magic: expected "
+ Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf));
}
}
/**
* Reads a magic record of the length {@link #MAGIC_LENGTH} from the given
* byte buffer and expects it to match this block type.
*/
public void readAndCheck(ByteBuffer in) throws IOException {
byte[] buf = new byte[MAGIC_LENGTH];
in.get(buf);
if (Bytes.compareTo(buf, magic) != 0) {
throw new IOException("Invalid magic: expected "
+ Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf));
}
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,308 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.ByteBloomFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
/**
* Implements pretty-printing functionality for {@link HFile}s.
*/
public class HFilePrettyPrinter {
private static final Log LOG = LogFactory.getLog(HFilePrettyPrinter.class);
private Options options = new Options();
private boolean verbose;
private boolean printValue;
private boolean printKey;
private boolean shouldPrintMeta;
private boolean printBlocks;
private boolean checkRow;
private boolean checkFamily;
private Configuration conf;
private List<Path> files = new ArrayList<Path>();
private int count;
private static final String FOUR_SPACES = " ";
public HFilePrettyPrinter() {
options.addOption("v", "verbose", false,
"Verbose output; emits file and meta data delimiters");
options.addOption("p", "printkv", false, "Print key/value pairs");
options.addOption("e", "printkey", false, "Print keys");
options.addOption("m", "printmeta", false, "Print meta data of file");
options.addOption("b", "printblocks", false, "Print block index meta data");
options.addOption("k", "checkrow", false,
"Enable row order check; looks for out-of-order keys");
options.addOption("a", "checkfamily", false, "Enable family check");
options.addOption("f", "file", true,
"File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34");
options.addOption("r", "region", true,
"Region to scan. Pass region name; e.g. '.META.,,1'");
}
public boolean parseOptions(String args[]) throws ParseException,
IOException {
if (args.length == 0) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("HFile", options, true);
return false;
}
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args);
verbose = cmd.hasOption("v");
printValue = cmd.hasOption("p");
printKey = cmd.hasOption("e") || printValue;
shouldPrintMeta = cmd.hasOption("m");
printBlocks = cmd.hasOption("b");
checkRow = cmd.hasOption("k");
checkFamily = cmd.hasOption("a");
if (cmd.hasOption("f")) {
files.add(new Path(cmd.getOptionValue("f")));
}
if (cmd.hasOption("r")) {
String regionName = cmd.getOptionValue("r");
byte[] rn = Bytes.toBytes(regionName);
byte[][] hri = HRegionInfo.parseRegionName(rn);
Path rootDir = FSUtils.getRootDir(conf);
Path tableDir = new Path(rootDir, Bytes.toString(hri[0]));
String enc = HRegionInfo.encodeRegionName(rn);
Path regionDir = new Path(tableDir, enc);
if (verbose)
System.out.println("region dir -> " + regionDir);
List<Path> regionFiles = HFile.getStoreFiles(FileSystem.get(conf),
regionDir);
if (verbose)
System.out.println("Number of region files found -> "
+ regionFiles.size());
if (verbose) {
int i = 1;
for (Path p : regionFiles) {
if (verbose)
System.out.println("Found file[" + i++ + "] -> " + p);
}
}
files.addAll(regionFiles);
}
return true;
}
/**
* Runs the command-line pretty-printer, and returns the desired command
* exit code (zero for success, non-zero for failure).
*/
public int run(String[] args) {
conf = HBaseConfiguration.create();
conf.set("fs.defaultFS",
conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
conf.set("fs.default.name",
conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
try {
if (!parseOptions(args))
return 1;
} catch (IOException ex) {
LOG.error("Error parsing command-line options", ex);
return 1;
} catch (ParseException ex) {
LOG.error("Error parsing command-line options", ex);
return 1;
}
// iterate over all files found
for (Path fileName : files) {
try {
processFile(fileName);
} catch (IOException ex) {
LOG.error("Error reading " + fileName, ex);
}
}
if (verbose || printKey) {
System.out.println("Scanned kv count -> " + count);
}
return 0;
}
private void processFile(Path file) throws IOException {
if (verbose)
System.out.println("Scanning -> " + file);
FileSystem fs = file.getFileSystem(conf);
if (!fs.exists(file)) {
System.err.println("ERROR, file doesnt exist: " + file);
}
HFile.Reader reader = HFile.createReader(fs, file, null, false, false);
Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
if (verbose || printKey || checkRow || checkFamily) {
// scan over file and read key/value's and check if requested
HFileScanner scanner = reader.getScanner(false, false, false);
scanner.seekTo();
scanKeysValues(file, count, scanner);
}
// print meta data
if (shouldPrintMeta) {
printMeta(reader, fileInfo);
}
if (printBlocks) {
System.out.println("Block Index:");
System.out.println(reader.getDataBlockIndexReader());
}
reader.close();
}
private void scanKeysValues(Path file, int count, HFileScanner scanner)
throws IOException {
KeyValue pkv = null;
do {
KeyValue kv = scanner.getKeyValue();
// dump key value
if (printKey) {
System.out.print("K: " + kv);
if (printValue) {
System.out.print(" V: " + Bytes.toStringBinary(kv.getValue()));
}
System.out.println();
}
// check if rows are in order
if (checkRow && pkv != null) {
if (Bytes.compareTo(pkv.getRow(), kv.getRow()) > 0) {
System.err.println("WARNING, previous row is greater then"
+ " current row\n\tfilename -> " + file + "\n\tprevious -> "
+ Bytes.toStringBinary(pkv.getKey()) + "\n\tcurrent -> "
+ Bytes.toStringBinary(kv.getKey()));
}
}
// check if families are consistent
if (checkFamily) {
String fam = Bytes.toString(kv.getFamily());
if (!file.toString().contains(fam)) {
System.err.println("WARNING, filename does not match kv family,"
+ "\n\tfilename -> " + file + "\n\tkeyvalue -> "
+ Bytes.toStringBinary(kv.getKey()));
}
if (pkv != null
&& !Bytes.equals(pkv.getFamily(), kv.getFamily())) {
System.err.println("WARNING, previous kv has different family"
+ " compared to current key\n\tfilename -> " + file
+ "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey())
+ "\n\tcurrent -> " + Bytes.toStringBinary(kv.getKey()));
}
}
pkv = kv;
++count;
} while (scanner.next());
}
/**
* Format a string of the form "k1=v1, k2=v2, ..." into separate lines
* with a four-space indentation.
*/
private static String asSeparateLines(String keyValueStr) {
return keyValueStr.replaceAll(", ([a-zA-Z]+=)",
",\n" + FOUR_SPACES + "$1");
}
private void printMeta(HFile.Reader reader, Map<byte[], byte[]> fileInfo)
throws IOException {
System.out.println("Block index size as per heapsize: "
+ reader.indexSize());
System.out.println(asSeparateLines(reader.toString()));
System.out.println("Trailer:\n "
+ asSeparateLines(reader.getTrailer().toString()));
System.out.println("Fileinfo:");
for (Map.Entry<byte[], byte[]> e : fileInfo.entrySet()) {
System.out.print(FOUR_SPACES + Bytes.toString(e.getKey()) + " = ");
if (Bytes.compareTo(e.getKey(), Bytes.toBytes("MAX_SEQ_ID_KEY")) == 0) {
long seqid = Bytes.toLong(e.getValue());
System.out.println(seqid);
} else if (Bytes.compareTo(e.getKey(), Bytes.toBytes("TIMERANGE")) == 0) {
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
Writables.copyWritable(e.getValue(), timeRangeTracker);
System.out.println(timeRangeTracker.getMinimumTimestamp() + "...."
+ timeRangeTracker.getMaximumTimestamp());
} else if (Bytes.compareTo(e.getKey(), FileInfo.AVG_KEY_LEN) == 0
|| Bytes.compareTo(e.getKey(), FileInfo.AVG_VALUE_LEN) == 0) {
System.out.println(Bytes.toInt(e.getValue()));
} else {
System.out.println(Bytes.toStringBinary(e.getValue()));
}
}
System.out.println("Mid-key: " + Bytes.toStringBinary(reader.midkey()));
// Printing bloom information
DataInput bloomMeta = reader.getBloomFilterMetadata();
BloomFilter bloomFilter = null;
if (bloomMeta != null)
bloomFilter = BloomFilterFactory.createFromMeta(bloomMeta, reader);
System.out.println("Bloom filter:");
if (bloomFilter != null) {
System.out.println(FOUR_SPACES + bloomFilter.toString().replaceAll(
ByteBloomFilter.STATS_RECORD_SEP, "\n" + FOUR_SPACES));
} else {
System.out.println(FOUR_SPACES + "Not present");
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataOutput;
import java.io.IOException;
/**
* A way to write "inline" blocks into an {@link HFile}. Inline blocks are
* interspersed with data blocks. For example, Bloom filter chunks and
* leaf-level blocks of a multi-level block index are stored as inline blocks.
*/
public interface InlineBlockWriter {
/**
* Determines whether there is a new block to be written out.
*
* @param closing
* whether the file is being closed, in which case we need to write
* out all available data and not wait to accumulate another block
*/
boolean shouldWriteBlock(boolean closing);
/**
* Writes the block to the provided stream. Must not write any magic records.
* Called only if {@link #shouldWriteBlock(boolean)} returned true.
*
* @param out
* a stream (usually a compressing stream) to write the block to
*/
void writeInlineBlock(DataOutput out) throws IOException;
/**
* Called after a block has been written, and its offset, raw size, and
* compressed size have been determined. Can be used to add an entry to a
* block index. If this type of inline blocks needs a block index, the inline
* block writer is responsible for maintaining it.
*
* @param offset the offset of the block in the stream
* @param onDiskSize the on-disk size of the block
* @param uncompressedSize the uncompressed size of the block
* @param rawSize
*/
void blockWritten(long offset, int onDiskSize, int uncompressedSize);
/**
* The type of blocks this block writer produces.
*/
BlockType getInlineBlockType();
/**
* @return true if inline blocks produced by this writer should be cached
*/
boolean cacheOnWrite();
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.io.RawComparator;
/**
* Common methods Bloom filter methods required at read and write time.
*/
public interface BloomFilterBase {
/**
* @return The number of keys added to the bloom
*/
long getKeyCount();
/**
* @return The max number of keys that can be inserted
* to maintain the desired error rate
*/
long getMaxKeys();
/**
* @return Size of the bloom, in bytes
*/
long getByteSize();
/**
* Create a key for a row-column Bloom filter.
*/
byte[] createBloomKey(byte[] rowBuf, int rowOffset, int rowLen,
byte[] qualBuf, int qualOffset, int qualLen);
/**
* @return Bloom key comparator
*/
RawComparator<byte[]> getComparator();
}

View File

@ -0,0 +1,208 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.DataInput;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
/**
* Handles Bloom filter initialization based on configuration and serialized
* metadata in the reader and writer of {@link StoreFile}.
*/
public final class BloomFilterFactory {
private static final Log LOG =
LogFactory.getLog(BloomFilterFactory.class.getName());
/** This class should not be instantiated. */
private BloomFilterFactory() {}
/**
* Specifies the target error rate to use when selecting the number of keys
* per Bloom filter.
*/
public static final String IO_STOREFILE_BLOOM_ERROR_RATE =
"io.storefile.bloom.error.rate";
/**
* Maximum folding factor allowed. The Bloom filter will be shrunk by
* the factor of up to 2 ** this times if we oversize it initially.
*/
public static final String IO_STOREFILE_BLOOM_MAX_FOLD =
"io.storefile.bloom.max.fold";
/**
* For default (single-block) Bloom filters this specifies the maximum number
* of keys.
*/
public static final String IO_STOREFILE_BLOOM_MAX_KEYS =
"io.storefile.bloom.max.keys";
/** Master switch to enable Bloom filters */
public static final String IO_STOREFILE_BLOOM_ENABLED =
"io.storefile.bloom.enabled";
/**
* Target Bloom block size. Bloom filter blocks of approximately this size
* are interleaved with data blocks.
*/
public static final String IO_STOREFILE_BLOOM_BLOCK_SIZE =
"io.storefile.bloom.block.size";
/** Whether to cache compound Bloom filter blocks on write */
public static final String IO_STOREFILE_BLOOM_CACHE_ON_WRITE =
"io.storefile.bloom.cacheonwrite";
/** Maximum number of times a Bloom filter can be "folded" if oversized */
private static final int MAX_ALLOWED_FOLD_FACTOR = 7;
/**
* Instantiates the correct Bloom filter class based on the version provided
* in the meta block data.
*
* @param meta the byte array holding the Bloom filter's metadata, including
* version information
* @param reader the {@link HFile} reader to use to lazily load Bloom filter
* blocks
* @return an instance of the correct type of Bloom filter
* @throws IllegalArgumentException
*/
public static BloomFilter
createFromMeta(DataInput meta, HFile.Reader reader)
throws IllegalArgumentException, IOException {
int version = meta.readInt();
switch (version) {
case ByteBloomFilter.VERSION:
// This is only possible in a version 1 HFile. We are ignoring the
// passed comparator because raw byte comparators are always used
// in version 1 Bloom filters.
return new ByteBloomFilter(meta);
case CompoundBloomFilterBase.VERSION:
return new CompoundBloomFilter(meta, reader);
default:
throw new IllegalArgumentException(
"Bad bloom filter format version " + version
);
}
}
/**
* @return true if Bloom filters are enabled in the given configuration
*/
public static boolean isBloomEnabled(Configuration conf) {
return conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true);
}
public static float getErrorRate(Configuration conf) {
return conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
}
/**
* Creates a new Bloom filter at the time of
* {@link org.apache.hadoop.hbase.regionserver.StoreFile} writing.
*
* @param conf
* @param bloomType
* @param maxKeys an estimate of the number of keys we expect to insert.
* Irrelevant if compound Bloom filters are enabled.
* @param writer the HFile writer
* @param comparator the comparator to use for compound Bloom filters. This
* has no effect if creating single-chunk version 1 Bloom filters.
* @return the new Bloom filter, or null in case Bloom filters are disabled
* or when failed to create one.
*/
public static BloomFilterWriter createBloomAtWrite(Configuration conf,
BloomType bloomType, int maxKeys, HFile.Writer writer) {
if (!isBloomEnabled(conf)) {
LOG.info("Bloom filters are disabled by configuration for "
+ writer.getPath()
+ (conf == null ? " (configuration is null)" : ""));
return null;
} else if (bloomType == BloomType.NONE) {
LOG.info("Bloom filter is turned off for the column family");
return null;
}
float err = getErrorRate(conf);
// In case of row/column Bloom filter lookups, each lookup is an OR if two
// separate lookups. Therefore, if each lookup's false positive rate is p,
// the resulting false positive rate is err = 1 - (1 - p)^2, and
// p = 1 - sqrt(1 - err).
if (bloomType == BloomType.ROWCOL) {
err = (float) (1 - Math.sqrt(1 - err));
}
int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD,
MAX_ALLOWED_FOLD_FACTOR);
if (HFile.getFormatVersion(conf) > HFile.MIN_FORMAT_VERSION) {
// In case of compound Bloom filters we ignore the maxKeys hint.
CompoundBloomFilterWriter bloomWriter = new CompoundBloomFilterWriter(
getBloomBlockSize(conf), err, Hash.getHashType(conf), maxFold,
cacheChunksOnWrite(conf), bloomType == BloomType.ROWCOL
? KeyValue.KEY_COMPARATOR : Bytes.BYTES_RAWCOMPARATOR);
writer.addInlineBlockWriter(bloomWriter);
return bloomWriter;
} else {
// A single-block Bloom filter. Only used when testing HFile format
// version 1.
int tooBig = conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS,
128 * 1000 * 1000);
if (maxKeys <= 0) {
LOG.warn("Invalid maximum number of keys specified: " + maxKeys
+ ", not using Bloom filter");
return null;
} else if (maxKeys < tooBig) {
BloomFilterWriter bloom = new ByteBloomFilter((int) maxKeys, err,
Hash.getHashType(conf), maxFold);
bloom.allocBloom();
return bloom;
} else {
LOG.debug("Skipping bloom filter because max keysize too large: "
+ maxKeys);
}
}
return null;
}
/** @return the compound Bloom filter block size from the configuration */
public static int getBloomBlockSize(Configuration conf) {
return conf.getInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, 128 * 1024);
}
/** @return whether to cache compound Bloom filter chunks on write */
public static boolean cacheChunksOnWrite(Configuration conf) {
return conf.getBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, false);
}
};

View File

@ -0,0 +1,175 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
import org.apache.hadoop.io.RawComparator;
/**
* A Bloom filter implementation built on top of {@link ByteBloomFilter},
* encapsulating a set of fixed-size Bloom filters written out at the time of
* {@link org.apache.hadoop.hbase.io.hfile.HFile} generation into the data
* block stream, and loaded on demand at query time. This class only provides
* reading capabilities.
*/
public class CompoundBloomFilter extends CompoundBloomFilterBase
implements BloomFilter {
/** Used to load chunks on demand */
private HFile.Reader reader;
private HFileBlockIndex.BlockIndexReader index;
private int hashCount;
private Hash hash;
private long[] numQueriesPerChunk;
private long[] numPositivesPerChunk;
/**
* De-serialization for compound Bloom filter metadata. Must be consistent
* with what {@link CompoundBloomFilterWriter} does.
*
* @param meta serialized Bloom filter metadata without any magic blocks
* @throws IOException
*/
public CompoundBloomFilter(DataInput meta, HFile.Reader reader)
throws IOException {
this.reader = reader;
totalByteSize = meta.readLong();
hashCount = meta.readInt();
hashType = meta.readInt();
totalKeyCount = meta.readLong();
totalMaxKeys = meta.readLong();
numChunks = meta.readInt();
comparator = FixedFileTrailer.createComparator(
Bytes.toString(Bytes.readByteArray(meta)));
hash = Hash.getInstance(hashType);
if (hash == null) {
throw new IllegalArgumentException("Invalid hash type: " + hashType);
}
index = new HFileBlockIndex.BlockIndexReader(comparator, 1);
index.readRootIndex(meta, numChunks);
}
@Override
public boolean contains(byte[] key, int keyOffset, int keyLength,
ByteBuffer bloom) {
// We try to store the result in this variable so we can update stats for
// testing, but when an error happens, we log a message and return.
boolean result;
int block = index.rootBlockContainingKey(key, keyOffset, keyLength);
if (block < 0) {
result = false; // This key is not in the file.
} else {
HFileBlock bloomBlock;
try {
// We cache the block and use a positional read.
bloomBlock = reader.readBlock(index.getRootBlockOffset(block),
index.getRootBlockDataSize(block), true, true, false);
} catch (IOException ex) {
// The Bloom filter is broken, turn it off.
throw new IllegalArgumentException(
"Failed to load Bloom block for key "
+ Bytes.toStringBinary(key, keyOffset, keyLength), ex);
}
ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly();
result = ByteBloomFilter.contains(key, keyOffset, keyLength,
bloomBuf.array(), bloomBuf.arrayOffset() + HFileBlock.HEADER_SIZE,
bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
}
if (numQueriesPerChunk != null && block >= 0) {
// Update statistics. Only used in unit tests.
++numQueriesPerChunk[block];
if (result)
++numPositivesPerChunk[block];
}
return result;
}
public boolean supportsAutoLoading() {
return true;
}
public int getNumChunks() {
return numChunks;
}
@Override
public RawComparator<byte[]> getComparator() {
return comparator;
}
public void enableTestingStats() {
numQueriesPerChunk = new long[numChunks];
numPositivesPerChunk = new long[numChunks];
}
public String formatTestingStats() {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numChunks; ++i) {
sb.append("chunk #");
sb.append(i);
sb.append(": queries=");
sb.append(numQueriesPerChunk[i]);
sb.append(", positives=");
sb.append(numPositivesPerChunk[i]);
sb.append(", positiveRatio=");
sb.append(numPositivesPerChunk[i] * 1.0 / numQueriesPerChunk[i]);
sb.append(";\n");
}
return sb.toString();
}
public long getNumQueriesForTesting(int chunk) {
return numQueriesPerChunk[chunk];
}
public long getNumPositivesForTesting(int chunk) {
return numPositivesPerChunk[chunk];
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(ByteBloomFilter.formatStats(this));
sb.append(ByteBloomFilter.STATS_RECORD_SEP +
"Number of chunks: " + numChunks);
sb.append(ByteBloomFilter.STATS_RECORD_SEP +
"Comparator: " + comparator.getClass().getSimpleName());
return sb.toString();
}
}

View File

@ -0,0 +1,95 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.io.RawComparator;
public class CompoundBloomFilterBase implements BloomFilterBase {
/**
* At read time, the total number of chunks. At write time, the number of
* chunks created so far. The first chunk has an ID of 0, and the current
* chunk has the ID of numChunks - 1.
*/
protected int numChunks;
/**
* The Bloom filter version. There used to be a DynamicByteBloomFilter which
* had version 2.
*/
public static final int VERSION = 3;
/** Target error rate for configuring the filter and for information */
protected float errorRate;
/** The total number of keys in all chunks */
protected long totalKeyCount;
protected long totalByteSize;
protected long totalMaxKeys;
/** Hash function type to use, as defined in {@link Hash} */
protected int hashType;
/** Comparator used to compare Bloom filter keys */
protected RawComparator<byte[]> comparator;
@Override
public long getMaxKeys() {
return totalMaxKeys;
}
@Override
public long getKeyCount() {
return totalKeyCount;
}
@Override
public long getByteSize() {
return totalByteSize;
}
private static final byte[] DUMMY = new byte[0];
/**
* Prepare an ordered pair of row and qualifier to be compared using
* {@link KeyValue.KeyComparator}. This is only used for row-column Bloom
* filters.
*/
@Override
public byte[] createBloomKey(byte[] row, int roffset, int rlength,
byte[] qualifier, int qoffset, int qlength) {
if (qualifier == null)
qualifier = DUMMY;
// Make sure this does not specify a timestamp so that the default maximum
// (most recent) timestamp is used.
KeyValue kv = KeyValue.createFirstOnRow(row, roffset, rlength, DUMMY, 0, 0,
qualifier, qoffset, qlength);
return kv.getKey();
}
@Override
public RawComparator<byte[]> getComparator() {
return comparator;
}
}

View File

@ -0,0 +1,277 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
import org.apache.hadoop.hbase.io.hfile.InlineBlockWriter;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
/**
* Adds methods required for writing a compound Bloom filter to the data
* section of an {@link org.apache.hadoop.hbase.io.hfile.HFile} to the
* {@link CompoundBloomFilter} class.
*/
public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
implements BloomFilterWriter, InlineBlockWriter {
protected static final Log LOG =
LogFactory.getLog(CompoundBloomFilterWriter.class);
/** The current chunk being written to */
private ByteBloomFilter chunk;
/** Previous chunk, so that we can create another similar chunk */
private ByteBloomFilter prevChunk;
/** Maximum fold factor */
private int maxFold;
/** The size of individual Bloom filter chunks to create */
private int chunkByteSize;
/** A Bloom filter chunk enqueued for writing */
private static class ReadyChunk {
int chunkId;
byte[] firstKey;
ByteBloomFilter chunk;
}
private Queue<ReadyChunk> readyChunks = new LinkedList<ReadyChunk>();
/** The first key in the current Bloom filter chunk. */
private byte[] firstKeyInChunk = null;
private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter =
new HFileBlockIndex.BlockIndexWriter();
/** Whether to cache-on-write compound Bloom filter chunks */
private boolean cacheOnWrite;
/**
* @param chunkByteSizeHint
* each chunk's size in bytes. The real chunk size might be different
* as required by the fold factor.
* @param errorRate
* target false positive rate
* @param hashType
* hash function type to use
* @param maxFold
* maximum degree of folding allowed
*/
public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate,
int hashType, int maxFold, boolean cacheOnWrite,
RawComparator<byte[]> comparator) {
chunkByteSize = ByteBloomFilter.computeFoldableByteSize(
chunkByteSizeHint * 8, maxFold);
this.errorRate = errorRate;
this.hashType = hashType;
this.maxFold = maxFold;
this.cacheOnWrite = cacheOnWrite;
this.comparator = comparator;
}
@Override
public boolean shouldWriteBlock(boolean closing) {
enqueueReadyChunk(closing);
return !readyChunks.isEmpty();
}
/**
* Enqueue the current chunk if it is ready to be written out.
*
* @param closing true if we are closing the file, so we do not expect new
* keys to show up
*/
private void enqueueReadyChunk(boolean closing) {
if (chunk == null ||
(chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) {
return;
}
if (firstKeyInChunk == null) {
throw new NullPointerException("Trying to enqueue a chunk, " +
"but first key is null: closing=" + closing + ", keyCount=" +
chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys());
}
ReadyChunk readyChunk = new ReadyChunk();
readyChunk.chunkId = numChunks - 1;
readyChunk.chunk = chunk;
readyChunk.firstKey = firstKeyInChunk;
readyChunks.add(readyChunk);
long prevMaxKeys = chunk.getMaxKeys();
long prevByteSize = chunk.getByteSize();
chunk.compactBloom();
if (LOG.isDebugEnabled() && prevByteSize != chunk.getByteSize()) {
LOG.debug("Compacted Bloom chunk #" + readyChunk.chunkId + " from ["
+ prevMaxKeys + " max keys, " + prevByteSize + " bytes] to ["
+ chunk.getMaxKeys() + " max keys, " + chunk.getByteSize()
+ " bytes]");
}
totalMaxKeys += chunk.getMaxKeys();
totalByteSize += chunk.getByteSize();
firstKeyInChunk = null;
prevChunk = chunk;
chunk = null;
}
/**
* Adds a Bloom filter key. This key must be greater than the previous key,
* as defined by the comparator this compound Bloom filter is configured
* with. For efficiency, key monotonicity is not checked here. See
* {@link org.apache.hadoop.hbase.regionserver.StoreFile.Writer#append(
* org.apache.hadoop.hbase.KeyValue)} for the details of deduplication.
*/
@Override
public void add(byte[] bloomKey, int keyOffset, int keyLength) {
if (bloomKey == null)
throw new NullPointerException();
enqueueReadyChunk(false);
if (chunk == null) {
if (firstKeyInChunk != null) {
throw new IllegalStateException("First key in chunk already set: "
+ Bytes.toStringBinary(firstKeyInChunk));
}
firstKeyInChunk = Arrays.copyOfRange(bloomKey, keyOffset, keyOffset
+ keyLength);
if (prevChunk == null) {
// First chunk
chunk = ByteBloomFilter.createBySize(chunkByteSize, errorRate,
hashType, maxFold);
} else {
// Use the same parameters as the last chunk, but a new array and
// a zero key count.
chunk = prevChunk.createAnother();
}
if (chunk.getKeyCount() != 0) {
throw new IllegalStateException("keyCount=" + chunk.getKeyCount()
+ " > 0");
}
chunk.allocBloom();
++numChunks;
}
chunk.add(bloomKey, keyOffset, keyLength);
++totalKeyCount;
}
@Override
public void writeInlineBlock(DataOutput out) throws IOException {
// We don't remove the chunk from the queue here, because we might need it
// again for cache-on-write.
ReadyChunk readyChunk = readyChunks.peek();
ByteBloomFilter readyChunkBloom = readyChunk.chunk;
readyChunkBloom.getDataWriter().write(out);
}
@Override
public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
ReadyChunk readyChunk = readyChunks.remove();
bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize);
}
@Override
public BlockType getInlineBlockType() {
return BlockType.BLOOM_CHUNK;
}
private class MetaWriter implements Writable {
protected MetaWriter() {}
@Override
public void readFields(DataInput in) throws IOException {
throw new IOException("Cant read with this class.");
}
/**
* This is modeled after {@link ByteBloomFilter.MetaWriter} for simplicity,
* although the two metadata formats do not have to be consistent. This
* does have to be consistent with how {@link
* CompoundBloomFilter#CompoundBloomFilter(DataInput,
* org.apache.hadoop.hbase.io.hfile.HFile.Reader)} reads fields.
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(VERSION);
out.writeLong(getByteSize());
out.writeInt(prevChunk.getHashCount());
out.writeInt(prevChunk.getHashType());
out.writeLong(getKeyCount());
out.writeLong(getMaxKeys());
// Fields that don't have equivalents in ByteBloomFilter.
out.writeInt(numChunks);
Bytes.writeByteArray(out,
Bytes.toBytes(comparator.getClass().getName()));
// Write a single-level index without compression or block header.
bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");
}
}
@Override
public Writable getMetaWriter() {
return new MetaWriter();
}
@Override
public void compactBloom() {
}
@Override
public void allocBloom() {
// Nothing happens here. All allocation happens on demand.
}
@Override
public Writable getDataWriter() {
return null;
}
@Override
public boolean cacheOnWrite() {
return cacheOnWrite;
}
}