HBASE-3857 New files

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1153640 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-08-03 20:18:03 +00:00
parent b354e4ea98
commit 8e2b477566
6 changed files with 2869 additions and 0 deletions

View File

@ -0,0 +1,475 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.RawComparator;
import static org.apache.hadoop.hbase.io.hfile.HFile.MIN_FORMAT_VERSION;
import static org.apache.hadoop.hbase.io.hfile.HFile.MAX_FORMAT_VERSION;
import com.google.common.io.NullOutputStream;
/**
* The {@link HFile} has a fixed trailer which contains offsets to other
* variable parts of the file. Also includes basic metadata on this file. The
* trailer size is fixed within a given {@link HFile} format version only, but
* we always store the version number as the last four-byte integer of the file.
*/
public class FixedFileTrailer {
private static final Log LOG = LogFactory.getLog(FixedFileTrailer.class);
/**
* We store the comparator class name as a fixed-length field in the trailer.
*/
private static final int MAX_COMPARATOR_NAME_LENGTH = 128;
/**
* Offset to the fileinfo data, a small block of vitals. Necessary in v1 but
* only potentially useful for pretty-printing in v2.
*/
private long fileInfoOffset;
/**
* In version 1, the offset to the data block index. Starting from version 2,
* the meaning of this field is the offset to the section of the file that
* should be loaded at the time the file is being opened, and as of the time
* of writing, this happens to be the offset of the file info section.
*/
private long loadOnOpenDataOffset;
/** The number of entries in the root data index. */
private int dataIndexCount;
/** Total uncompressed size of all blocks of the data index */
private long uncompressedDataIndexSize;
/** The number of entries in the meta index */
private int metaIndexCount;
/** The total uncompressed size of keys/values stored in the file. */
private long totalUncompressedBytes;
/**
* The number of key/value pairs in the file. This field was int in version 1,
* but is now long.
*/
private long entryCount;
/** The compression codec used for all blocks. */
private Compression.Algorithm compressionCodec = Compression.Algorithm.NONE;
/**
* The number of levels in the potentially multi-level data index. Used from
* version 2 onwards.
*/
private int numDataIndexLevels;
/** The offset of the first data block. */
private long firstDataBlockOffset;
/**
* It is guaranteed that no key/value data blocks start after this offset in
* the file.
*/
private long lastDataBlockOffset;
/** Raw key comparator class name in version 2 */
private String comparatorClassName = RawComparator.class.getName();
/** The {@link HFile} format version. */
private final int version;
FixedFileTrailer(int version) {
this.version = version;
HFile.checkFormatVersion(version);
}
private static int[] computeTrailerSizeByVersion() {
int versionToSize[] = new int[HFile.MAX_FORMAT_VERSION + 1];
for (int version = MIN_FORMAT_VERSION;
version <= MAX_FORMAT_VERSION;
++version) {
FixedFileTrailer fft = new FixedFileTrailer(version);
DataOutputStream dos = new DataOutputStream(new NullOutputStream());
try {
fft.serialize(dos);
} catch (IOException ex) {
// The above has no reason to fail.
throw new RuntimeException(ex);
}
versionToSize[version] = dos.size();
}
return versionToSize;
}
private static int getMaxTrailerSize() {
int maxSize = 0;
for (int version = MIN_FORMAT_VERSION;
version <= MAX_FORMAT_VERSION;
++version)
maxSize = Math.max(getTrailerSize(version), maxSize);
return maxSize;
}
private static final int TRAILER_SIZE[] = computeTrailerSizeByVersion();
private static final int MAX_TRAILER_SIZE = getMaxTrailerSize();
static int getTrailerSize(int version) {
return TRAILER_SIZE[version];
}
public int getTrailerSize() {
return getTrailerSize(version);
}
/**
* Write the trailer to a data stream. We support writing version 1 for
* testing and for determining version 1 trailer size. It is also easy to see
* what fields changed in version 2.
*
* @param outputStream
* @throws IOException
*/
void serialize(DataOutputStream outputStream) throws IOException {
HFile.checkFormatVersion(version);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutput baosDos = new DataOutputStream(baos);
BlockType.TRAILER.write(baosDos);
baosDos.writeLong(fileInfoOffset);
baosDos.writeLong(loadOnOpenDataOffset);
baosDos.writeInt(dataIndexCount);
if (version == 1) {
// This used to be metaIndexOffset, but it was not used in version 1.
baosDos.writeLong(0);
} else {
baosDos.writeLong(uncompressedDataIndexSize);
}
baosDos.writeInt(metaIndexCount);
baosDos.writeLong(totalUncompressedBytes);
if (version == 1) {
baosDos.writeInt((int) Math.min(Integer.MAX_VALUE, entryCount));
} else {
// This field is long from version 2 onwards.
baosDos.writeLong(entryCount);
}
baosDos.writeInt(compressionCodec.ordinal());
if (version > 1) {
baosDos.writeInt(numDataIndexLevels);
baosDos.writeLong(firstDataBlockOffset);
baosDos.writeLong(lastDataBlockOffset);
Bytes.writeStringFixedSize(baosDos, comparatorClassName,
MAX_COMPARATOR_NAME_LENGTH);
}
baosDos.writeInt(version);
outputStream.write(baos.toByteArray());
}
/**
* Deserialize the fixed file trailer from the given stream. The version needs
* to already be specified. Make sure this is consistent with
* {@link #serialize(DataOutputStream)}.
*
* @param inputStream
* @param version
* @throws IOException
*/
void deserialize(DataInputStream inputStream) throws IOException {
HFile.checkFormatVersion(version);
BlockType.TRAILER.readAndCheck(inputStream);
fileInfoOffset = inputStream.readLong();
loadOnOpenDataOffset = inputStream.readLong();
dataIndexCount = inputStream.readInt();
if (version == 1) {
inputStream.readLong(); // Read and skip metaIndexOffset.
} else {
uncompressedDataIndexSize = inputStream.readLong();
}
metaIndexCount = inputStream.readInt();
totalUncompressedBytes = inputStream.readLong();
entryCount = version == 1 ? inputStream.readInt() : inputStream.readLong();
compressionCodec = Compression.Algorithm.values()[inputStream.readInt()];
if (version > 1) {
numDataIndexLevels = inputStream.readInt();
firstDataBlockOffset = inputStream.readLong();
lastDataBlockOffset = inputStream.readLong();
comparatorClassName =
Bytes.readStringFixedSize(inputStream, MAX_COMPARATOR_NAME_LENGTH);
}
expectVersion(inputStream.readInt());
}
private void append(StringBuilder sb, String s) {
if (sb.length() > 0)
sb.append(", ");
sb.append(s);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
append(sb, "fileinfoOffset=" + fileInfoOffset);
append(sb, "loadOnOpenDataOffset=" + loadOnOpenDataOffset);
append(sb, "dataIndexCount=" + dataIndexCount);
append(sb, "metaIndexCount=" + metaIndexCount);
append(sb, "totalUncomressedBytes=" + totalUncompressedBytes);
append(sb, "entryCount=" + entryCount);
append(sb, "compressionCodec=" + compressionCodec);
if (version == 2) {
append(sb, "uncompressedDataIndexSize=" + uncompressedDataIndexSize);
append(sb, "numDataIndexLevels=" + numDataIndexLevels);
append(sb, "firstDataBlockOffset=" + firstDataBlockOffset);
append(sb, "lastDataBlockOffset=" + lastDataBlockOffset);
append(sb, "comparatorClassName=" + comparatorClassName);
}
append(sb, "version=" + version);
return sb.toString();
}
/**
* Reads a file trailer from the given file.
*
* @param istream the input stream with the ability to seek. Does not have to
* be buffered, as only one read operation is made.
* @param fileSize the file size. Can be obtained using
* {@link org.apache.hadoop.fs.FileSystem#getFileStatus(
* org.apache.hadoop.fs.Path)}.
* @return the fixed file trailer read
* @throws IOException if failed to read from the underlying stream, or the
* trailer is corrupted, or the version of the trailer is
* unsupported
*/
public static FixedFileTrailer readFromStream(FSDataInputStream istream,
long fileSize) throws IOException {
int bufferSize = MAX_TRAILER_SIZE;
long seekPoint = fileSize - bufferSize;
if (seekPoint < 0) {
// It is hard to imagine such a small HFile.
seekPoint = 0;
bufferSize = (int) fileSize;
}
istream.seek(seekPoint);
ByteBuffer buf = ByteBuffer.allocate(bufferSize);
istream.readFully(buf.array(), buf.arrayOffset(),
buf.arrayOffset() + buf.limit());
// Read the version from the last int of the file.
buf.position(buf.limit() - Bytes.SIZEOF_INT);
int version = buf.getInt();
try {
HFile.checkFormatVersion(version);
} catch (IllegalArgumentException iae) {
// In this context, an invalid version might indicate a corrupt HFile.
throw new IOException(iae);
}
int trailerSize = getTrailerSize(version);
FixedFileTrailer fft = new FixedFileTrailer(version);
fft.deserialize(new DataInputStream(new ByteArrayInputStream(buf.array(),
buf.arrayOffset() + bufferSize - trailerSize, trailerSize)));
return fft;
}
public void expectVersion(int expected) {
if (version != expected) {
throw new IllegalArgumentException("Invalid HFile version: " + version
+ " (expected: " + expected + ")");
}
}
public void expectAtLeastVersion(int lowerBound) {
if (version < lowerBound) {
throw new IllegalArgumentException("Invalid HFile version: " + version
+ " (expected: " + lowerBound + " or higher).");
}
}
public long getFileInfoOffset() {
return fileInfoOffset;
}
public void setFileInfoOffset(long fileInfoOffset) {
this.fileInfoOffset = fileInfoOffset;
}
public long getLoadOnOpenDataOffset() {
return loadOnOpenDataOffset;
}
public void setLoadOnOpenOffset(long loadOnOpenDataOffset) {
this.loadOnOpenDataOffset = loadOnOpenDataOffset;
}
public int getDataIndexCount() {
return dataIndexCount;
}
public void setDataIndexCount(int dataIndexCount) {
this.dataIndexCount = dataIndexCount;
}
public int getMetaIndexCount() {
return metaIndexCount;
}
public void setMetaIndexCount(int metaIndexCount) {
this.metaIndexCount = metaIndexCount;
}
public long getTotalUncompressedBytes() {
return totalUncompressedBytes;
}
public void setTotalUncompressedBytes(long totalUncompressedBytes) {
this.totalUncompressedBytes = totalUncompressedBytes;
}
public long getEntryCount() {
return entryCount;
}
public void setEntryCount(long newEntryCount) {
if (version == 1) {
int intEntryCount = (int) Math.min(Integer.MAX_VALUE, newEntryCount);
if (intEntryCount != newEntryCount) {
LOG.info("Warning: entry count is " + newEntryCount + " but writing "
+ intEntryCount + " into the version " + version + " trailer");
}
entryCount = intEntryCount;
return;
}
entryCount = newEntryCount;
}
public Compression.Algorithm getCompressionCodec() {
return compressionCodec;
}
public void setCompressionCodec(Compression.Algorithm compressionCodec) {
this.compressionCodec = compressionCodec;
}
public int getNumDataIndexLevels() {
expectAtLeastVersion(2);
return numDataIndexLevels;
}
public void setNumDataIndexLevels(int numDataIndexLevels) {
expectAtLeastVersion(2);
this.numDataIndexLevels = numDataIndexLevels;
}
public long getLastDataBlockOffset() {
expectAtLeastVersion(2);
return lastDataBlockOffset;
}
public void setLastDataBlockOffset(long lastDataBlockOffset) {
expectAtLeastVersion(2);
this.lastDataBlockOffset = lastDataBlockOffset;
}
public long getFirstDataBlockOffset() {
expectAtLeastVersion(2);
return firstDataBlockOffset;
}
public void setFirstDataBlockOffset(long firstDataBlockOffset) {
expectAtLeastVersion(2);
this.firstDataBlockOffset = firstDataBlockOffset;
}
public int getVersion() {
return version;
}
@SuppressWarnings("rawtypes")
public void setComparatorClass(Class<? extends RawComparator> klass) {
expectAtLeastVersion(2);
comparatorClassName = klass.getName();
}
@SuppressWarnings("unchecked")
private static Class<? extends RawComparator<byte[]>> getComparatorClass(
String comparatorClassName) throws IOException {
try {
return (Class<? extends RawComparator<byte[]>>)
Class.forName(comparatorClassName);
} catch (ClassNotFoundException ex) {
throw new IOException(ex);
}
}
public static RawComparator<byte[]> createComparator(
String comparatorClassName) throws IOException {
try {
return getComparatorClass(comparatorClassName).newInstance();
} catch (InstantiationException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
}
RawComparator<byte[]> createComparator() throws IOException {
expectAtLeastVersion(2);
return createComparator(comparatorClassName);
}
public long getUncompressedDataIndexSize() {
if (version == 1)
return 0;
return uncompressedDataIndexSize;
}
public void setUncompressedDataIndexSize(
long uncompressedDataIndexSize) {
expectAtLeastVersion(2);
this.uncompressedDataIndexSize = uncompressedDataIndexSize;
}
}

View File

@ -0,0 +1,666 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import com.google.common.base.Preconditions;
/**
* {@link HFile} reader for version 1.
*/
public class HFileReaderV1 extends AbstractHFileReader {
private static final Log LOG = LogFactory.getLog(HFileReaderV1.class);
private volatile boolean fileInfoLoaded = false;
/**
* Opens a HFile. You must load the index before you can
* use it by calling {@link #loadFileInfo()}.
*
* @param fsdis input stream. Caller is responsible for closing the passed
* stream.
* @param size Length of the stream.
* @param blockCache block cache. Pass null if none.
* @param inMemory whether blocks should be marked as in-memory in cache
* @param evictOnClose whether blocks in cache should be evicted on close
* @throws IOException
*/
public HFileReaderV1(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final long size,
final boolean closeIStream,
final BlockCache blockCache, final boolean inMemory,
final boolean evictOnClose) {
super(path, trailer, fsdis, size, closeIStream, blockCache, inMemory,
evictOnClose);
trailer.expectVersion(1);
fsBlockReader = new HFileBlock.FSReaderV1(fsdis, compressAlgo, fileSize);
}
private byte[] readAllIndex(final FSDataInputStream in,
final long indexOffset, final int indexSize) throws IOException {
byte[] allIndex = new byte[indexSize];
in.seek(indexOffset);
IOUtils.readFully(in, allIndex, 0, allIndex.length);
return allIndex;
}
/**
* Read in the index and file info.
*
* @return A map of fileinfo data.
* @see {@link Writer#appendFileInfo(byte[], byte[])}.
* @throws IOException
*/
@Override
public FileInfo loadFileInfo() throws IOException {
if (fileInfoLoaded)
return fileInfo;
// Read in the fileinfo and get what we need from it.
istream.seek(trailer.getFileInfoOffset());
fileInfo = new FileInfo();
fileInfo.readFields(istream);
lastKey = fileInfo.get(FileInfo.LASTKEY);
avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
// Comparator is stored in the file info in version 1.
String clazzName = Bytes.toString(fileInfo.get(FileInfo.COMPARATOR));
comparator = getComparator(clazzName);
dataBlockIndexReader =
new HFileBlockIndex.BlockIndexReader(comparator, 1);
metaBlockIndexReader =
new HFileBlockIndex.BlockIndexReader(Bytes.BYTES_RAWCOMPARATOR, 1);
int sizeToLoadOnOpen = (int) (fileSize - trailer.getLoadOnOpenDataOffset() -
trailer.getTrailerSize());
byte[] dataAndMetaIndex = readAllIndex(istream,
trailer.getLoadOnOpenDataOffset(), sizeToLoadOnOpen);
ByteArrayInputStream bis = new ByteArrayInputStream(dataAndMetaIndex);
DataInputStream dis = new DataInputStream(bis);
// Read in the data index.
if (trailer.getDataIndexCount() > 0)
BlockType.INDEX_V1.readAndCheck(dis);
dataBlockIndexReader.readRootIndex(dis, trailer.getDataIndexCount());
// Read in the metadata index.
if (trailer.getMetaIndexCount() > 0)
BlockType.INDEX_V1.readAndCheck(dis);
metaBlockIndexReader.readRootIndex(dis, trailer.getMetaIndexCount());
fileInfoLoaded = true;
return fileInfo;
}
/**
* Creates comparator from the given class name.
*
* @param clazzName the comparator class name read from the trailer
* @return an instance of the comparator to use
* @throws IOException in case comparator class name is invalid
*/
@SuppressWarnings("unchecked")
private RawComparator<byte[]> getComparator(final String clazzName)
throws IOException {
if (clazzName == null || clazzName.length() == 0) {
return null;
}
try {
return (RawComparator<byte[]>)Class.forName(clazzName).newInstance();
} catch (InstantiationException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
/**
* Create a Scanner on this file. No seeks or reads are done on creation. Call
* {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
* nothing to clean up in a Scanner. Letting go of your references to the
* scanner is sufficient.
*
* @param cacheBlocks True if we should cache blocks read in by this scanner.
* @param pread Use positional read rather than seek+read if true (pread is
* better for random reads, seek+read is better scanning).
* @param isCompaction is scanner being used for a compaction?
* @return Scanner on this file.
*/
@Override
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
return new ScannerV1(this, cacheBlocks, pread, isCompaction);
}
/**
* @param key Key to search.
* @return Block number of the block containing the key or -1 if not in this
* file.
*/
protected int blockContainingKey(final byte[] key, int offset, int length) {
Preconditions.checkState(!dataBlockIndexReader.isEmpty(),
"Block index not loaded");
return dataBlockIndexReader.rootBlockContainingKey(key, offset, length);
}
/**
* @param metaBlockName
* @param cacheBlock Add block to cache, if found
* @return Block wrapped in a ByteBuffer
* @throws IOException
*/
@Override
public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
throws IOException {
if (trailer.getMetaIndexCount() == 0) {
return null; // there are no meta blocks
}
if (metaBlockIndexReader == null) {
throw new IOException("Meta index not loaded");
}
byte[] nameBytes = Bytes.toBytes(metaBlockName);
int block = metaBlockIndexReader.rootBlockContainingKey(nameBytes, 0,
nameBytes.length);
if (block == -1)
return null;
long offset = metaBlockIndexReader.getRootBlockOffset(block);
long nextOffset;
if (block == metaBlockIndexReader.getRootBlockCount() - 1) {
nextOffset = trailer.getFileInfoOffset();
} else {
nextOffset = metaBlockIndexReader.getRootBlockOffset(block + 1);
}
long startTimeNs = System.nanoTime();
String cacheKey = HFile.getBlockCacheKey(name, offset);
// Per meta key from any given file, synchronize reads for said block
synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
metaLoads.incrementAndGet();
// Check cache for block. If found return.
if (blockCache != null) {
HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey,
true);
if (cachedBlock != null) {
cacheHits.incrementAndGet();
return cachedBlock.getBufferWithoutHeader();
}
// Cache Miss, please load.
}
HFileBlock hfileBlock = fsBlockReader.readBlockData(offset,
nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block),
true);
hfileBlock.expectType(BlockType.META);
HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs);
HFile.readOps.incrementAndGet();
// Cache the block
if (cacheBlock && blockCache != null) {
blockCache.cacheBlock(cacheKey, hfileBlock, inMemory);
}
return hfileBlock.getBufferWithoutHeader();
}
}
/**
* Read in a file block.
* @param block Index of block to read.
* @param pread Use positional read instead of seek+read (positional is
* better doing random reads whereas seek+read is better scanning).
* @param isCompaction is this block being read as part of a compaction
* @return Block wrapped in a ByteBuffer.
* @throws IOException
*/
ByteBuffer readBlockBuffer(int block, boolean cacheBlock,
final boolean pread, final boolean isCompaction) throws IOException {
if (dataBlockIndexReader == null) {
throw new IOException("Block index not loaded");
}
if (block < 0 || block >= dataBlockIndexReader.getRootBlockCount()) {
throw new IOException("Requested block is out of range: " + block +
", max: " + dataBlockIndexReader.getRootBlockCount());
}
long offset = dataBlockIndexReader.getRootBlockOffset(block);
String cacheKey = HFile.getBlockCacheKey(name, offset);
// For any given block from any given file, synchronize reads for said
// block.
// Without a cache, this synchronizing is needless overhead, but really
// the other choice is to duplicate work (which the cache would prevent you
// from doing).
synchronized (dataBlockIndexReader.getRootBlockKey(block)) {
blockLoads.incrementAndGet();
// Check cache for block. If found return.
if (blockCache != null) {
HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey,
true);
if (cachedBlock != null) {
cacheHits.incrementAndGet();
return cachedBlock.getBufferWithoutHeader();
}
// Carry on, please load.
}
// Load block from filesystem.
long startTimeNs = System.nanoTime();
long nextOffset;
if (block == dataBlockIndexReader.getRootBlockCount() - 1) {
// last block! The end of data block is first meta block if there is
// one or if there isn't, the fileinfo offset.
nextOffset = (metaBlockIndexReader.getRootBlockCount() == 0) ?
this.trailer.getFileInfoOffset() :
metaBlockIndexReader.getRootBlockOffset(0);
} else {
nextOffset = dataBlockIndexReader.getRootBlockOffset(block + 1);
}
HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset
- offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
hfileBlock.expectType(BlockType.DATA);
ByteBuffer buf = hfileBlock.getBufferWithoutHeader();
HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs);
HFile.readOps.incrementAndGet();
// Cache the block
if (cacheBlock && blockCache != null) {
blockCache.cacheBlock(cacheKey, hfileBlock, inMemory);
}
return buf;
}
}
/**
* @return Last key in the file. May be null if file has no entries.
* Note that this is not the last rowkey, but rather the byte form of
* the last KeyValue.
*/
public byte[] getLastKey() {
if (!fileInfoLoaded) {
throw new RuntimeException("Load file info first");
}
return dataBlockIndexReader.isEmpty() ? null : lastKey;
}
/**
* @return Midkey for this file. We work with block boundaries only so
* returned midkey is an approximation only.
*
* @throws IOException
*/
@Override
public byte[] midkey() throws IOException {
Preconditions.checkState(isFileInfoLoaded(), "File info is not loaded");
Preconditions.checkState(!dataBlockIndexReader.isEmpty(),
"Data block index is not loaded or is empty");
return dataBlockIndexReader.midkey();
}
@Override
public void close() throws IOException {
if (evictOnClose && this.blockCache != null) {
int numEvicted = 0;
for (int i = 0; i < dataBlockIndexReader.getRootBlockCount(); i++) {
if (blockCache.evictBlock(HFile.getBlockCacheKey(name,
dataBlockIndexReader.getRootBlockOffset(i))))
numEvicted++;
}
LOG.debug("On close of file " + name + " evicted " + numEvicted
+ " block(s) of " + dataBlockIndexReader.getRootBlockCount()
+ " total blocks");
}
if (this.closeIStream && this.istream != null) {
this.istream.close();
this.istream = null;
}
}
/**
* Implementation of {@link HFileScanner} interface.
*/
protected static class ScannerV1 extends AbstractHFileReader.Scanner {
private final HFileReaderV1 readerV1;
private int currBlock;
public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
super(reader, cacheBlocks, pread, isCompaction);
readerV1 = reader;
}
@Override
public KeyValue getKeyValue() {
if (blockBuffer == null) {
return null;
}
return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position() - 8);
}
@Override
public ByteBuffer getKey() {
Preconditions.checkState(blockBuffer != null && currKeyLen > 0,
"you need to seekTo() before calling getKey()");
ByteBuffer keyBuff = blockBuffer.slice();
keyBuff.limit(currKeyLen);
keyBuff.rewind();
// Do keyBuff.asReadOnly()?
return keyBuff;
}
@Override
public ByteBuffer getValue() {
if (blockBuffer == null || currKeyLen == 0) {
throw new RuntimeException(
"you need to seekTo() before calling getValue()");
}
// TODO: Could this be done with one ByteBuffer rather than create two?
ByteBuffer valueBuff = blockBuffer.slice();
valueBuff.position(currKeyLen);
valueBuff = valueBuff.slice();
valueBuff.limit(currValueLen);
valueBuff.rewind();
return valueBuff;
}
@Override
public boolean next() throws IOException {
if (blockBuffer == null) {
throw new IOException("Next called on non-seeked scanner");
}
try {
blockBuffer.position(blockBuffer.position() + currKeyLen
+ currValueLen);
} catch (IllegalArgumentException e) {
LOG.error("Current pos = " + blockBuffer.position() +
"; currKeyLen = " + currKeyLen +
"; currValLen = " + currValueLen +
"; block limit = " + blockBuffer.limit() +
"; HFile name = " + reader.getName() +
"; currBlock id = " + currBlock, e);
throw e;
}
if (blockBuffer.remaining() <= 0) {
// LOG.debug("Fetch next block");
currBlock++;
if (currBlock >= reader.getDataBlockIndexReader().getRootBlockCount()) {
// damn we are at the end
currBlock = 0;
blockBuffer = null;
return false;
}
blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread,
isCompaction);
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
blockFetches++;
return true;
}
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
return true;
}
@Override
public int seekTo(byte[] key) throws IOException {
return seekTo(key, 0, key.length);
}
@Override
public int seekTo(byte[] key, int offset, int length) throws IOException {
int b = readerV1.blockContainingKey(key, offset, length);
if (b < 0) return -1; // falls before the beginning of the file! :-(
// Avoid re-reading the same block (that'd be dumb).
loadBlock(b, true);
return blockSeek(key, offset, length, false);
}
@Override
public int reseekTo(byte[] key) throws IOException {
return reseekTo(key, 0, key.length);
}
@Override
public int reseekTo(byte[] key, int offset, int length)
throws IOException {
if (blockBuffer != null && currKeyLen != 0) {
ByteBuffer bb = getKey();
int compared = reader.getComparator().compare(key, offset,
length, bb.array(), bb.arrayOffset(), bb.limit());
if (compared <= 0) {
// If the required key is less than or equal to current key, then
// don't do anything.
return compared;
}
}
int b = readerV1.blockContainingKey(key, offset, length);
if (b < 0) {
return -1;
}
loadBlock(b, false);
return blockSeek(key, offset, length, false);
}
/**
* Within a loaded block, seek looking for the first key
* that is smaller than (or equal to?) the key we are interested in.
*
* A note on the seekBefore - if you have seekBefore = true, AND the
* first key in the block = key, then you'll get thrown exceptions.
* @param key to find
* @param seekBefore find the key before the exact match.
* @return
*/
private int blockSeek(byte[] key, int offset, int length,
boolean seekBefore) {
int klen, vlen;
int lastLen = 0;
do {
klen = blockBuffer.getInt();
vlen = blockBuffer.getInt();
int comp = reader.getComparator().compare(key, offset, length,
blockBuffer.array(),
blockBuffer.arrayOffset() + blockBuffer.position(), klen);
if (comp == 0) {
if (seekBefore) {
blockBuffer.position(blockBuffer.position() - lastLen - 16);
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
return 1; // non exact match.
}
currKeyLen = klen;
currValueLen = vlen;
return 0; // indicate exact match
}
if (comp < 0) {
// go back one key:
blockBuffer.position(blockBuffer.position() - lastLen - 16);
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
return 1;
}
blockBuffer.position(blockBuffer.position() + klen + vlen);
lastLen = klen + vlen;
} while (blockBuffer.remaining() > 0);
// ok we are at the end, so go back a littleeeeee....
// The 8 in the below is intentionally different to the 16s in the above
// Do the math you you'll figure it.
blockBuffer.position(blockBuffer.position() - lastLen - 8);
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
return 1; // didn't exactly find it.
}
@Override
public boolean seekBefore(byte[] key) throws IOException {
return seekBefore(key, 0, key.length);
}
@Override
public boolean seekBefore(byte[] key, int offset, int length)
throws IOException {
int b = readerV1.blockContainingKey(key, offset, length);
if (b < 0)
return false; // key is before the start of the file.
// Question: does this block begin with 'key'?
byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b);
if (reader.getComparator().compare(firstkKey, 0, firstkKey.length,
key, offset, length) == 0) {
// Ok the key we're interested in is the first of the block, so go back
// by one.
if (b == 0) {
// we have a 'problem', the key we want is the first of the file.
return false;
}
b--;
// TODO shortcut: seek forward in this block to the last key of the
// block.
}
loadBlock(b, true);
blockSeek(key, offset, length, true);
return true;
}
@Override
public String getKeyString() {
return Bytes.toStringBinary(blockBuffer.array(),
blockBuffer.arrayOffset() + blockBuffer.position(), currKeyLen);
}
@Override
public String getValueString() {
return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset() +
blockBuffer.position() + currKeyLen, currValueLen);
}
@Override
public Reader getReader() {
return reader;
}
@Override
public boolean seekTo() throws IOException {
if (reader.getDataBlockIndexReader().isEmpty()) {
return false;
}
if (blockBuffer != null && currBlock == 0) {
blockBuffer.rewind();
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
return true;
}
currBlock = 0;
blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread,
isCompaction);
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
blockFetches++;
return true;
}
private void loadBlock(int bloc, boolean rewind) throws IOException {
if (blockBuffer == null) {
blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread,
isCompaction);
currBlock = bloc;
blockFetches++;
} else {
if (bloc != currBlock) {
blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread,
isCompaction);
currBlock = bloc;
blockFetches++;
} else {
// we are already in the same block, just rewind to seek again.
if (rewind) {
blockBuffer.rewind();
}
else {
// Go back by (size of rowlength + size of valuelength) = 8 bytes
blockBuffer.position(blockBuffer.position()-8);
}
}
}
}
}
@Override
public HFileBlock readBlock(long offset, int onDiskBlockSize,
boolean cacheBlock, boolean pread, boolean isCompaction) {
throw new UnsupportedOperationException();
}
@Override
public DataInput getBloomFilterMetadata() throws IOException {
ByteBuffer buf = getMetaBlock(HFileWriterV1.BLOOM_FILTER_META_KEY, false);
if (buf == null)
return null;
ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),
buf.arrayOffset(), buf.limit());
return new DataInputStream(bais);
}
@Override
public boolean isFileInfoLoaded() {
return fileInfoLoaded;
}
}

View File

@ -0,0 +1,732 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
/**
* {@link HFile} reader for version 2.
*/
public class HFileReaderV2 extends AbstractHFileReader implements
HFileBlock.BasicReader {
private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
/**
* The size of a (key length, value length) tuple that prefixes each entry in
* a data block.
*/
private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
/**
* A "sparse lock" implementation allowing to lock on a particular block
* identified by offset. The purpose of this is to avoid two clients loading
* the same block, and have all but one client wait to get the block from the
* cache.
*/
private IdLock offsetLock = new IdLock();
/**
* Blocks read from the load-on-open section, excluding data root index, meta
* index, and file info.
*/
private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
/**
* Opens a HFile. You must load the index before you can use it by calling
* {@link #loadFileInfo()}.
*
* @param fsdis input stream. Caller is responsible for closing the passed
* stream.
* @param size Length of the stream.
* @param blockCache block cache. Pass null if none.
* @param inMemory whether blocks should be marked as in-memory in cache
* @param evictOnClose whether blocks in cache should be evicted on close
* @throws IOException
*/
public HFileReaderV2(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final long size,
final boolean closeIStream, final BlockCache blockCache,
final boolean inMemory, final boolean evictOnClose) throws IOException {
super(path, trailer, fsdis, size, closeIStream, blockCache, inMemory,
evictOnClose);
trailer.expectVersion(2);
fsBlockReader = new HFileBlock.FSReaderV2(fsdis, compressAlgo,
fileSize);
// Comparator class name is stored in the trailer in version 2.
comparator = trailer.createComparator();
dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
trailer.getNumDataIndexLevels(), this);
metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
Bytes.BYTES_RAWCOMPARATOR, 1);
// Parse load-on-open data.
HFileBlock.BlockIterator blockIter = fsBlockReader.blockRange(
trailer.getLoadOnOpenDataOffset(),
fileSize - trailer.getTrailerSize());
// Data index. We also read statistics about the block index written after
// the root level.
dataBlockIndexReader.readMultiLevelIndexRoot(
blockIter.nextBlockAsStream(BlockType.ROOT_INDEX),
trailer.getDataIndexCount());
// Meta index.
metaBlockIndexReader.readRootIndex(
blockIter.nextBlockAsStream(BlockType.ROOT_INDEX),
trailer.getMetaIndexCount());
// File info
fileInfo = new FileInfo();
fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO));
lastKey = fileInfo.get(FileInfo.LASTKEY);
avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
// Store all other load-on-open blocks for further consumption.
HFileBlock b;
while ((b = blockIter.nextBlock()) != null) {
loadOnOpenBlocks.add(b);
}
}
/**
* Create a Scanner on this file. No seeks or reads are done on creation. Call
* {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
* nothing to clean up in a Scanner. Letting go of your references to the
* scanner is sufficient.
*
* @param cacheBlocks True if we should cache blocks read in by this scanner.
* @param pread Use positional read rather than seek+read if true (pread is
* better for random reads, seek+read is better scanning).
* @param isCompaction is scanner being used for a compaction?
* @return Scanner on this file.
*/
@Override
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
return new ScannerV2(this, cacheBlocks, pread, isCompaction);
}
/**
* @param metaBlockName
* @param cacheBlock Add block to cache, if found
* @return block wrapped in a ByteBuffer, with header skipped
* @throws IOException
*/
@Override
public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
throws IOException {
if (trailer.getMetaIndexCount() == 0) {
return null; // there are no meta blocks
}
if (metaBlockIndexReader == null) {
throw new IOException("Meta index not loaded");
}
byte[] mbname = Bytes.toBytes(metaBlockName);
int block = metaBlockIndexReader.rootBlockContainingKey(mbname, 0,
mbname.length);
if (block == -1)
return null;
long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
long startTimeNs = System.nanoTime();
// Per meta key from any given file, synchronize reads for said block. This
// is OK to do for meta blocks because the meta block index is always
// single-level.
synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
metaLoads.incrementAndGet();
// Check cache for block. If found return.
long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
String cacheKey = HFile.getBlockCacheKey(name, metaBlockOffset);
if (blockCache != null) {
HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey,
true);
if (cachedBlock != null) {
// Return a distinct 'shallow copy' of the block,
// so pos does not get messed by the scanner
cacheHits.incrementAndGet();
return cachedBlock.getBufferWithoutHeader();
}
// Cache Miss, please load.
}
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
blockSize, -1, true);
HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs);
HFile.readOps.incrementAndGet();
// Cache the block
if (cacheBlock && blockCache != null) {
blockCache.cacheBlock(cacheKey, metaBlock, inMemory);
}
return metaBlock.getBufferWithoutHeader();
}
}
/**
* Implements the "basic block reader" API, used mainly by
* {@link HFileBlockIndex.BlockIndexReader} in
* {@link HFileBlockIndex.BlockIndexReader#seekToDataBlock(byte[], int, int,
* HFileBlock)} in a random-read access pattern.
*/
@Override
public HFileBlock readBlockData(long offset, long onDiskSize,
int uncompressedSize, boolean pread) throws IOException {
if (onDiskSize >= Integer.MAX_VALUE) {
throw new IOException("Invalid on-disk size: " + onDiskSize);
}
// Assuming we are not doing a compaction.
return readBlock(offset, (int) onDiskSize, true, pread, false);
}
/**
* Read in a file block.
*
* @param dataBlockOffset offset to read.
* @param onDiskSize size of the block
* @param pread Use positional read instead of seek+read (positional is better
* doing random reads whereas seek+read is better scanning).
* @param isCompaction is this block being read as part of a compaction
* @return Block wrapped in a ByteBuffer.
* @throws IOException
*/
@Override
public HFileBlock readBlock(long dataBlockOffset, int onDiskBlockSize,
boolean cacheBlock, boolean pread, final boolean isCompaction)
throws IOException {
if (dataBlockIndexReader == null) {
throw new IOException("Block index not loaded");
}
if (dataBlockOffset < 0
|| dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
throw new IOException("Requested block is out of range: "
+ dataBlockOffset + ", lastDataBlockOffset: "
+ trailer.getLastDataBlockOffset());
}
// For any given block from any given file, synchronize reads for said
// block.
// Without a cache, this synchronizing is needless overhead, but really
// the other choice is to duplicate work (which the cache would prevent you
// from doing).
String cacheKey = HFile.getBlockCacheKey(name, dataBlockOffset);
IdLock.Entry lockEntry = offsetLock.getLockEntry(dataBlockOffset);
try {
blockLoads.incrementAndGet();
// Check cache for block. If found return.
if (blockCache != null) {
HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(cacheKey,
true);
if (cachedBlock != null) {
cacheHits.incrementAndGet();
return cachedBlock;
}
// Carry on, please load.
}
// Load block from filesystem.
long startTimeNs = System.nanoTime();
HFileBlock dataBlock = fsBlockReader.readBlockData(dataBlockOffset,
onDiskBlockSize, -1, pread);
HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs);
HFile.readOps.incrementAndGet();
// Cache the block
if (cacheBlock && blockCache != null) {
blockCache.cacheBlock(cacheKey, dataBlock, inMemory);
}
return dataBlock;
} finally {
offsetLock.releaseLockEntry(lockEntry);
}
}
/**
* @return Last key in the file. May be null if file has no entries. Note that
* this is not the last row key, but rather the byte form of the last
* KeyValue.
*/
@Override
public byte[] getLastKey() {
return dataBlockIndexReader.isEmpty() ? null : lastKey;
}
/**
* @return Midkey for this file. We work with block boundaries only so
* returned midkey is an approximation only.
* @throws IOException
*/
@Override
public byte[] midkey() throws IOException {
return dataBlockIndexReader.midkey();
}
@Override
public void close() throws IOException {
if (evictOnClose && blockCache != null) {
int numEvicted = blockCache.evictBlocksByPrefix(name
+ HFile.CACHE_KEY_SEPARATOR);
LOG.debug("On close of file " + name + " evicted " + numEvicted
+ " block(s)");
}
if (closeIStream && istream != null) {
istream.close();
istream = null;
}
}
/**
* Implementation of {@link HFileScanner} interface.
*/
protected static class ScannerV2 extends AbstractHFileReader.Scanner {
private HFileBlock block;
public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
super(r, cacheBlocks, pread, isCompaction);
}
@Override
public KeyValue getKeyValue() {
if (!isSeeked())
return null;
return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position());
}
@Override
public ByteBuffer getKey() {
assertSeeked();
return ByteBuffer.wrap(
blockBuffer.array(),
blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE, currKeyLen).slice();
}
@Override
public ByteBuffer getValue() {
assertSeeked();
return ByteBuffer.wrap(
blockBuffer.array(),
blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
}
private void setNonSeekedState() {
block = null;
blockBuffer = null;
currKeyLen = 0;
currValueLen = 0;
}
/**
* Go to the next key/value in the block section. Loads the next block if
* necessary. If successful, {@link #getKey()} and {@link #getValue()} can
* be called.
*
* @return true if successfully navigated to the next key/value
*/
@Override
public boolean next() throws IOException {
assertSeeked();
try {
blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE
+ currKeyLen + currValueLen);
} catch (IllegalArgumentException e) {
LOG.error("Current pos = " + blockBuffer.position()
+ "; currKeyLen = " + currKeyLen + "; currValLen = "
+ currValueLen + "; block limit = " + blockBuffer.limit()
+ "; HFile name = " + reader.getName()
+ "; currBlock currBlockOffset = " + block.getOffset());
throw e;
}
if (blockBuffer.remaining() <= 0) {
long lastDataBlockOffset =
reader.getTrailer().getLastDataBlockOffset();
if (block.getOffset() >= lastDataBlockOffset) {
setNonSeekedState();
return false;
}
// read the next block
HFileBlock nextBlock = readNextDataBlock();
if (nextBlock == null) {
setNonSeekedState();
return false;
}
updateCurrBlock(nextBlock);
return true;
}
// We are still in the same block.
readKeyValueLen();
return true;
}
/**
* Scans blocks in the "scanned" section of the {@link HFile} until the next
* data block is found.
*
* @return the next block, or null if there are no more data blocks
* @throws IOException
*/
private HFileBlock readNextDataBlock() throws IOException {
long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
if (block == null)
return null;
HFileBlock curBlock = block;
do {
if (curBlock.getOffset() >= lastDataBlockOffset)
return null;
if (curBlock.getOffset() < 0) {
throw new IOException("Invalid block file offset: " + block);
}
curBlock = reader.readBlock(curBlock.getOffset()
+ curBlock.getOnDiskSizeWithHeader(),
curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
isCompaction);
} while (!curBlock.getBlockType().equals(BlockType.DATA));
return curBlock;
}
/**
* Positions this scanner at the start of the file.
*
* @return false if empty file; i.e. a call to next would return false and
* the current key and value are undefined.
* @throws IOException
*/
@Override
public boolean seekTo() throws IOException {
if (reader == null) {
return false;
}
if (reader.getTrailer().getEntryCount() == 0) {
// No data blocks.
return false;
}
long firstDataBlockOffset =
reader.getTrailer().getFirstDataBlockOffset();
if (block != null && block.getOffset() == firstDataBlockOffset) {
blockBuffer.rewind();
readKeyValueLen();
return true;
}
block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
isCompaction);
if (block.getOffset() < 0) {
throw new IOException("Invalid block offset: " + block.getOffset());
}
updateCurrBlock(block);
return true;
}
@Override
public int seekTo(byte[] key) throws IOException {
return seekTo(key, 0, key.length);
}
/**
* An internal API function. Seek to the given key, optionally rewinding to
* the first key of the block before doing the seek.
*
* @param key key byte array
* @param offset key offset in the key byte array
* @param length key length
* @param rewind whether to rewind to the first key of the block before
* doing the seek. If this is false, we are assuming we never go
* back, otherwise the result is undefined.
* @return -1 if the key is earlier than the first key of the file,
* 0 if we are at the given key, and 1 if we are past the given key
* @throws IOException
*/
private int seekTo(byte[] key, int offset, int length, boolean rewind)
throws IOException {
HFileBlock seekToBlock =
((HFileReaderV2) reader).getDataBlockIndexReader().seekToDataBlock(
key, offset, length, block);
if (seekToBlock == null) {
// This happens if the key e.g. falls before the beginning of the file.
return -1;
}
return loadBlockAndSeekToKey(seekToBlock, rewind, key, offset, length,
false);
}
@Override
public int seekTo(byte[] key, int offset, int length) throws IOException {
// Always rewind to the first key of the block, because the given key
// might be before or after the current key.
return seekTo(key, offset, length, true);
}
@Override
public int reseekTo(byte[] key) throws IOException {
return reseekTo(key, 0, key.length);
}
@Override
public int reseekTo(byte[] key, int offset, int length) throws IOException {
if (isSeeked()) {
ByteBuffer bb = getKey();
int compared = reader.getComparator().compare(key, offset,
length, bb.array(), bb.arrayOffset(), bb.limit());
if (compared < 1) {
// If the required key is less than or equal to current key, then
// don't do anything.
return compared;
}
}
// Don't rewind on a reseek operation, because reseek implies that we are
// always going forward in the file.
return seekTo(key, offset, length, false);
}
private int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
byte[] key, int offset, int length, boolean seekBefore)
throws IOException {
if (block == null || block.getOffset() != seekToBlock.getOffset()) {
updateCurrBlock(seekToBlock);
} else if (rewind) {
blockBuffer.rewind();
}
return blockSeek(key, offset, length, seekBefore);
}
/**
* Updates the current block to be the given {@link HFileBlock}. Seeks to
* the the first key/value pair.
*
* @param newBlock the block to make current
*/
private void updateCurrBlock(HFileBlock newBlock) {
block = newBlock;
blockBuffer = block.getBufferWithoutHeader();
readKeyValueLen();
blockFetches++;
}
private final void readKeyValueLen() {
blockBuffer.mark();
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
blockBuffer.reset();
if (currKeyLen < 0 || currValueLen < 0
|| currKeyLen > blockBuffer.limit()
|| currValueLen > blockBuffer.limit()) {
throw new IllegalStateException("Invalid currKeyLen " + currKeyLen
+ " or currValueLen " + currValueLen + ". Block offset: "
+ block.getOffset() + ", block length: " + blockBuffer.limit()
+ ", position: " + blockBuffer.position() + " (without header).");
}
}
/**
* Within a loaded block, seek looking for the first key that is smaller
* than (or equal to?) the key we are interested in.
*
* A note on the seekBefore: if you have seekBefore = true, AND the first
* key in the block = key, then you'll get thrown exceptions. The caller has
* to check for that case and load the previous block as appropriate.
*
* @param key the key to find
* @param seekBefore find the key before the given key in case of exact
* match.
* @return 0 in case of an exact key match, 1 in case of an inexact match
*/
private int blockSeek(byte[] key, int offset, int length,
boolean seekBefore) {
int klen, vlen;
int lastKeyValueSize = -1;
do {
blockBuffer.mark();
klen = blockBuffer.getInt();
vlen = blockBuffer.getInt();
blockBuffer.reset();
int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE;
int comp = reader.getComparator().compare(key, offset, length,
blockBuffer.array(), keyOffset, klen);
if (comp == 0) {
if (seekBefore) {
if (lastKeyValueSize < 0) {
throw new IllegalStateException("blockSeek with seekBefore "
+ "at the first key of the block: key="
+ Bytes.toStringBinary(key) + ", blockOffset="
+ block.getOffset() + ", onDiskSize="
+ block.getOnDiskSizeWithHeader());
}
blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
readKeyValueLen();
return 1; // non exact match.
}
currKeyLen = klen;
currValueLen = vlen;
return 0; // indicate exact match
}
if (comp < 0) {
if (lastKeyValueSize > 0)
blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
readKeyValueLen();
return 1;
}
// The size of this key/value tuple, including key/value length fields.
lastKeyValueSize = klen + vlen + KEY_VALUE_LEN_SIZE;
blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
} while (blockBuffer.remaining() > 0);
// Seek to the last key we successfully read. This will happen if this is
// the last key/value pair in the file, in which case the following call
// to next() has to return false.
blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
readKeyValueLen();
return 1; // didn't exactly find it.
}
@Override
public boolean seekBefore(byte[] key) throws IOException {
return seekBefore(key, 0, key.length);
}
private ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
ByteBuffer buffer = curBlock.getBufferWithoutHeader();
// It is safe to manipulate this buffer because we own the buffer object.
buffer.rewind();
int klen = buffer.getInt();
buffer.getInt();
ByteBuffer keyBuff = buffer.slice();
keyBuff.limit(klen);
keyBuff.rewind();
return keyBuff;
}
@Override
public boolean seekBefore(byte[] key, int offset, int length)
throws IOException {
HFileReaderV2 reader2 = (HFileReaderV2) reader;
HFileBlock seekToBlock =
reader2.getDataBlockIndexReader().seekToDataBlock(
key, offset, length, block);
if (seekToBlock == null) {
return false;
}
ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
if (reader.getComparator().compare(firstKey.array(),
firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
{
long previousBlockOffset = seekToBlock.getPrevBlockOffset();
// The key we are interested in
if (previousBlockOffset == -1) {
// we have a 'problem', the key we want is the first of the file.
return false;
}
// It is important that we compute and pass onDiskSize to the block
// reader so that it does not have to read the header separately to
// figure out the size.
seekToBlock = reader2.fsBlockReader.readBlockData(previousBlockOffset,
seekToBlock.getOffset() - previousBlockOffset, -1, pread);
// TODO shortcut: seek forward in this block to the last key of the
// block.
}
loadBlockAndSeekToKey(seekToBlock, true, key, offset, length, true);
return true;
}
@Override
public String getKeyString() {
return Bytes.toStringBinary(blockBuffer.array(),
blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE, currKeyLen);
}
@Override
public String getValueString() {
return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
currValueLen);
}
}
/**
* Returns a buffer with the Bloom filter metadata. The caller takes
* ownership of the buffer.
*/
@Override
public DataInput getBloomFilterMetadata() throws IOException {
for (HFileBlock b : loadOnOpenBlocks)
if (b.getBlockType() == BlockType.BLOOM_META)
return b.getByteStream();
return null;
}
@Override
public boolean isFileInfoLoaded() {
return true; // We load file info in constructor in version 2.
}
}

View File

@ -0,0 +1,483 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.Compressor;
/**
* Writes version 1 HFiles. Mainly used for testing backwards-compatibilty.
*/
public class HFileWriterV1 extends AbstractHFileWriter {
/** Meta data block name for bloom filter parameters. */
static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
/** Meta data block name for bloom filter bits. */
public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
private static final Log LOG = LogFactory.getLog(HFileWriterV1.class);
// A stream made per block written.
private DataOutputStream out;
// Offset where the current block began.
private long blockBegin;
// First keys of every block.
private ArrayList<byte[]> blockKeys = new ArrayList<byte[]>();
// Block offset in backing stream.
private ArrayList<Long> blockOffsets = new ArrayList<Long>();
// Raw (decompressed) data size.
private ArrayList<Integer> blockDataSizes = new ArrayList<Integer>();
private Compressor compressor;
// Additional byte array output stream used to fill block cache
private ByteArrayOutputStream baos;
private DataOutputStream baosDos;
private int blockNumber = 0;
static class WriterFactoryV1 extends HFile.WriterFactory {
WriterFactoryV1(Configuration conf) { super(conf); }
@Override
public Writer createWriter(FileSystem fs, Path path) throws IOException {
return new HFileWriterV1(conf, fs, path);
}
@Override
public Writer createWriter(FileSystem fs, Path path, int blockSize,
Compression.Algorithm compressAlgo, final KeyComparator comparator)
throws IOException {
return new HFileWriterV1(conf, fs, path, blockSize,
compressAlgo, comparator);
}
@Override
public Writer createWriter(FileSystem fs, Path path, int blockSize,
String compressAlgoName,
final KeyComparator comparator) throws IOException {
return new HFileWriterV1(conf, fs, path, blockSize,
compressAlgoName, comparator);
}
@Override
public Writer createWriter(final FSDataOutputStream ostream,
final int blockSize, final String compress,
final KeyComparator comparator) throws IOException {
return new HFileWriterV1(conf, ostream, blockSize, compress, comparator);
}
@Override
public Writer createWriter(final FSDataOutputStream ostream,
final int blockSize, final Compression.Algorithm compress,
final KeyComparator c) throws IOException {
return new HFileWriterV1(conf, ostream, blockSize, compress, c);
}
}
/** Constructor that uses all defaults for compression and block size. */
public HFileWriterV1(Configuration conf, FileSystem fs, Path path)
throws IOException {
this(conf, fs, path, HFile.DEFAULT_BLOCKSIZE,
HFile.DEFAULT_COMPRESSION_ALGORITHM,
null);
}
/**
* Constructor that takes a path, creates and closes the output stream. Takes
* compression algorithm name as string.
*/
public HFileWriterV1(Configuration conf, FileSystem fs, Path path,
int blockSize, String compressAlgoName,
final KeyComparator comparator) throws IOException {
this(conf, fs, path, blockSize,
compressionByName(compressAlgoName), comparator);
}
/** Constructor that takes a path, creates and closes the output stream. */
public HFileWriterV1(Configuration conf, FileSystem fs, Path path,
int blockSize, Compression.Algorithm compress,
final KeyComparator comparator) throws IOException {
super(conf, createOutputStream(conf, fs, path), path,
blockSize, compress, comparator);
}
/** Constructor that takes a stream. */
public HFileWriterV1(Configuration conf,
final FSDataOutputStream outputStream, final int blockSize,
final String compressAlgoName, final KeyComparator comparator)
throws IOException {
this(conf, outputStream, blockSize,
Compression.getCompressionAlgorithmByName(compressAlgoName),
comparator);
}
/** Constructor that takes a stream. */
public HFileWriterV1(Configuration conf,
final FSDataOutputStream outputStream, final int blockSize,
final Compression.Algorithm compress, final KeyComparator comparator)
throws IOException {
super(conf, outputStream, null, blockSize, compress, comparator);
}
/**
* If at block boundary, opens new block.
*
* @throws IOException
*/
private void checkBlockBoundary() throws IOException {
if (this.out != null && this.out.size() < blockSize)
return;
finishBlock();
newBlock();
}
/**
* Do the cleanup if a current block.
*
* @throws IOException
*/
private void finishBlock() throws IOException {
if (this.out == null)
return;
long startTimeNs = System.nanoTime();
int size = releaseCompressingStream(this.out);
this.out = null;
blockKeys.add(firstKeyInBlock);
blockOffsets.add(Long.valueOf(blockBegin));
blockDataSizes.add(Integer.valueOf(size));
this.totalUncompressedBytes += size;
HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs);
HFile.writeOps.incrementAndGet();
if (cacheDataBlocksOnWrite) {
baosDos.flush();
byte[] bytes = baos.toByteArray();
blockCache.cacheBlock(HFile.getBlockCacheKey(name, blockBegin),
new HFileBlock(BlockType.DATA,
(int) (outputStream.getPos() - blockBegin), bytes.length, -1,
ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin));
baosDos.close();
}
blockNumber++;
}
/**
* Ready a new block for writing.
*
* @throws IOException
*/
private void newBlock() throws IOException {
// This is where the next block begins.
blockBegin = outputStream.getPos();
this.out = getCompressingStream();
BlockType.DATA.write(out);
firstKeyInBlock = null;
if (cacheDataBlocksOnWrite) {
this.baos = new ByteArrayOutputStream();
this.baosDos = new DataOutputStream(baos);
baosDos.write(HFileBlock.DUMMY_HEADER);
}
}
/**
* Sets up a compressor and creates a compression stream on top of
* this.outputStream. Get one per block written.
*
* @return A compressing stream; if 'none' compression, returned stream does
* not compress.
*
* @throws IOException
*
* @see {@link #releaseCompressingStream(DataOutputStream)}
*/
private DataOutputStream getCompressingStream() throws IOException {
this.compressor = compressAlgo.getCompressor();
// Get new DOS compression stream. In tfile, the DOS, is not closed,
// just finished, and that seems to be fine over there. TODO: Check
// no memory retention of the DOS. Should I disable the 'flush' on the
// DOS as the BCFile over in tfile does? It wants to make it so flushes
// don't go through to the underlying compressed stream. Flush on the
// compressed downstream should be only when done. I was going to but
// looks like when we call flush in here, its legitimate flush that
// should go through to the compressor.
OutputStream os = this.compressAlgo.createCompressionStream(
this.outputStream, this.compressor, 0);
return new DataOutputStream(os);
}
/**
* Let go of block compressor and compressing stream gotten in call {@link
* #getCompressingStream}.
*
* @param dos
*
* @return How much was written on this stream since it was taken out.
*
* @see #getCompressingStream()
*
* @throws IOException
*/
private int releaseCompressingStream(final DataOutputStream dos)
throws IOException {
dos.flush();
this.compressAlgo.returnCompressor(this.compressor);
this.compressor = null;
return dos.size();
}
/**
* Add a meta block to the end of the file. Call before close(). Metadata
* blocks are expensive. Fill one with a bunch of serialized data rather than
* do a metadata block per metadata instance. If metadata is small, consider
* adding to file info using {@link #appendFileInfo(byte[], byte[])}
*
* @param metaBlockName
* name of the block
* @param content
* will call readFields to get data later (DO NOT REUSE)
*/
public void appendMetaBlock(String metaBlockName, Writable content) {
byte[] key = Bytes.toBytes(metaBlockName);
int i;
for (i = 0; i < metaNames.size(); ++i) {
// stop when the current key is greater than our own
byte[] cur = metaNames.get(i);
if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
key.length) > 0) {
break;
}
}
metaNames.add(i, key);
metaData.add(i, content);
}
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
*
* @param kv
* KeyValue to add. Cannot be empty nor null.
* @throws IOException
*/
public void append(final KeyValue kv) throws IOException {
append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
}
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
*
* @param key
* Key to add. Cannot be empty nor null.
* @param value
* Value to add. Cannot be empty nor null.
* @throws IOException
*/
public void append(final byte[] key, final byte[] value) throws IOException {
append(key, 0, key.length, value, 0, value.length);
}
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
*
* @param key
* @param koffset
* @param klength
* @param value
* @param voffset
* @param vlength
* @throws IOException
*/
private void append(final byte[] key, final int koffset, final int klength,
final byte[] value, final int voffset, final int vlength)
throws IOException {
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
if (!dupKey) {
checkBlockBoundary();
}
// Write length of key and value and then actual key and value bytes.
this.out.writeInt(klength);
totalKeyLength += klength;
this.out.writeInt(vlength);
totalValueLength += vlength;
this.out.write(key, koffset, klength);
this.out.write(value, voffset, vlength);
// Are we the first key in this block?
if (this.firstKeyInBlock == null) {
// Copy the key.
this.firstKeyInBlock = new byte[klength];
System.arraycopy(key, koffset, this.firstKeyInBlock, 0, klength);
}
this.lastKeyBuffer = key;
this.lastKeyOffset = koffset;
this.lastKeyLength = klength;
this.entryCount++;
// If we are pre-caching blocks on write, fill byte array stream
if (cacheDataBlocksOnWrite) {
this.baosDos.writeInt(klength);
this.baosDos.writeInt(vlength);
this.baosDos.write(key, koffset, klength);
this.baosDos.write(value, voffset, vlength);
}
}
public void close() throws IOException {
if (this.outputStream == null) {
return;
}
// Write out the end of the data blocks, then write meta data blocks.
// followed by fileinfo, data block index and meta block index.
finishBlock();
FixedFileTrailer trailer = new FixedFileTrailer(1);
// Write out the metadata blocks if any.
ArrayList<Long> metaOffsets = null;
ArrayList<Integer> metaDataSizes = null;
if (metaNames.size() > 0) {
metaOffsets = new ArrayList<Long>(metaNames.size());
metaDataSizes = new ArrayList<Integer>(metaNames.size());
for (int i = 0; i < metaNames.size(); ++i) {
// store the beginning offset
long curPos = outputStream.getPos();
metaOffsets.add(curPos);
// write the metadata content
DataOutputStream dos = getCompressingStream();
BlockType.META.write(dos);
metaData.get(i).write(dos);
int size = releaseCompressingStream(dos);
// store the metadata size
metaDataSizes.add(size);
}
}
writeFileInfo(trailer, outputStream);
// Write the data block index.
trailer.setLoadOnOpenOffset(writeBlockIndex(this.outputStream,
this.blockKeys, this.blockOffsets, this.blockDataSizes));
LOG.info("Wrote a version 1 block index with " + this.blockKeys.size()
+ " keys");
if (metaNames.size() > 0) {
// Write the meta index.
writeBlockIndex(this.outputStream, metaNames, metaOffsets, metaDataSizes);
}
// Now finish off the trailer.
trailer.setDataIndexCount(blockKeys.size());
finishClose(trailer);
}
@Override
protected void finishFileInfo() throws IOException {
super.finishFileInfo();
// In version 1, we store comparator name in the file info.
fileInfo.append(FileInfo.COMPARATOR,
Bytes.toBytes(comparator.getClass().getName()), false);
}
@Override
public void addInlineBlockWriter(InlineBlockWriter bloomWriter) {
// Inline blocks only exist in HFile format version 2.
throw new UnsupportedOperationException();
}
/**
* Version 1 Bloom filters are stored in two meta blocks with two different
* keys.
*/
@Override
public void addBloomFilter(BloomFilterWriter bfw) {
appendMetaBlock(BLOOM_FILTER_META_KEY,
bfw.getMetaWriter());
Writable dataWriter = bfw.getDataWriter();
if (dataWriter != null) {
appendMetaBlock(BLOOM_FILTER_DATA_KEY, dataWriter);
}
}
/**
* Write out the index in the version 1 format. This conforms to the legacy
* version 1 format, but can still be read by
* {@link HFileBlockIndex.BlockIndexReader#readRootIndex(java.io.DataInputStream,
* int)}.
*
* @param out the stream to write to
* @param keys
* @param offsets
* @param uncompressedSizes in contrast with a version 2 root index format,
* the sizes stored in the version 1 are uncompressed sizes
* @return
* @throws IOException
*/
private static long writeBlockIndex(final FSDataOutputStream out,
final List<byte[]> keys, final List<Long> offsets,
final List<Integer> uncompressedSizes) throws IOException {
long pos = out.getPos();
// Don't write an index if nothing in the index.
if (keys.size() > 0) {
BlockType.INDEX_V1.write(out);
// Write the index.
for (int i = 0; i < keys.size(); ++i) {
out.writeLong(offsets.get(i).longValue());
out.writeInt(uncompressedSizes.get(i).intValue());
byte[] key = keys.get(i);
Bytes.writeByteArray(out, key);
}
}
return pos;
}
}

View File

@ -0,0 +1,452 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
/**
* Writes HFile format version 2.
*/
public class HFileWriterV2 extends AbstractHFileWriter {
/** Inline block writers for multi-level block index and compound Blooms. */
private List<InlineBlockWriter> inlineBlockWriters =
new ArrayList<InlineBlockWriter>();
/** Unified version 2 block writer */
private HFileBlock.Writer fsBlockWriter;
private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
/** The offset of the first data block or -1 if the file is empty. */
private long firstDataBlockOffset = -1;
/** The offset of the last data block or 0 if the file is empty. */
private long lastDataBlockOffset;
/** Additional data items to be written to the "load-on-open" section. */
private List<BlockWritable> additionalLoadOnOpenData =
new ArrayList<BlockWritable>();
static class WriterFactoryV2 extends HFile.WriterFactory {
WriterFactoryV2(Configuration conf) { super(conf); }
@Override
public Writer createWriter(FileSystem fs, Path path)
throws IOException {
return new HFileWriterV2(conf, fs, path);
}
@Override
public Writer createWriter(FileSystem fs, Path path, int blockSize,
Compression.Algorithm compress,
final KeyComparator comparator) throws IOException {
return new HFileWriterV2(conf, fs, path, blockSize,
compress, comparator);
}
@Override
public Writer createWriter(FileSystem fs, Path path, int blockSize,
String compress, final KeyComparator comparator)
throws IOException {
return new HFileWriterV2(conf, fs, path, blockSize,
compress, comparator);
}
@Override
public Writer createWriter(final FSDataOutputStream ostream,
final int blockSize, final String compress,
final KeyComparator comparator) throws IOException {
return new HFileWriterV2(conf, ostream, blockSize, compress, comparator);
}
@Override
public Writer createWriter(final FSDataOutputStream ostream,
final int blockSize, final Compression.Algorithm compress,
final KeyComparator c) throws IOException {
return new HFileWriterV2(conf, ostream, blockSize, compress, c);
}
}
/** Constructor that uses all defaults for compression and block size. */
public HFileWriterV2(Configuration conf, FileSystem fs, Path path)
throws IOException {
this(conf, fs, path, HFile.DEFAULT_BLOCKSIZE,
HFile.DEFAULT_COMPRESSION_ALGORITHM, null);
}
/**
* Constructor that takes a path, creates and closes the output stream. Takes
* compression algorithm name as string.
*/
public HFileWriterV2(Configuration conf, FileSystem fs, Path path,
int blockSize, String compressAlgoName,
final KeyComparator comparator) throws IOException {
this(conf, fs, path, blockSize,
compressionByName(compressAlgoName), comparator);
}
/** Constructor that takes a path, creates and closes the output stream. */
public HFileWriterV2(Configuration conf, FileSystem fs, Path path,
int blockSize, Compression.Algorithm compressAlgo,
final KeyComparator comparator) throws IOException {
super(conf, createOutputStream(conf, fs, path), path,
blockSize, compressAlgo, comparator);
finishInit(conf);
}
/** Constructor that takes a stream. */
public HFileWriterV2(final Configuration conf,
final FSDataOutputStream outputStream, final int blockSize,
final String compressAlgoName, final KeyComparator comparator)
throws IOException {
this(conf, outputStream, blockSize,
Compression.getCompressionAlgorithmByName(compressAlgoName),
comparator);
}
/** Constructor that takes a stream. */
public HFileWriterV2(final Configuration conf,
final FSDataOutputStream outputStream, final int blockSize,
final Compression.Algorithm compress, final KeyComparator comparator)
throws IOException {
super(conf, outputStream, null, blockSize, compress, comparator);
finishInit(conf);
}
/** Additional initialization steps */
private void finishInit(final Configuration conf) {
if (fsBlockWriter != null)
throw new IllegalStateException("finishInit called twice");
// HFile filesystem-level (non-caching) block writer
fsBlockWriter = new HFileBlock.Writer(compressAlgo);
// Data block index writer
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
cacheIndexBlocksOnWrite ? blockCache : null,
cacheIndexBlocksOnWrite ? name : null);
dataBlockIndexWriter.setMaxChunkSize(
HFileBlockIndex.getMaxChunkSize(conf));
inlineBlockWriters.add(dataBlockIndexWriter);
// Meta data block index writer
metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
}
/**
* At a block boundary, write all the inline blocks and opens new block.
*
* @throws IOException
*/
private void checkBlockBoundary() throws IOException {
if (fsBlockWriter.blockSizeWritten() < blockSize)
return;
finishBlock();
writeInlineBlocks(false);
newBlock();
}
/** Clean up the current block */
private void finishBlock() throws IOException {
if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
return;
long startTimeNs = System.nanoTime();
// Update the first data block offset for scanning.
if (firstDataBlockOffset == -1)
firstDataBlockOffset = outputStream.getPos();
// Update the last data block offset
lastDataBlockOffset = outputStream.getPos();
fsBlockWriter.writeHeaderAndData(outputStream);
int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
dataBlockIndexWriter.addEntry(firstKeyInBlock, lastDataBlockOffset,
onDiskSize);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs);
HFile.writeOps.incrementAndGet();
if (cacheDataBlocksOnWrite) {
blockCache.cacheBlock(HFile.getBlockCacheKey(name, lastDataBlockOffset),
fsBlockWriter.getBlockForCaching());
}
}
/** Gives inline block writers an opportunity to contribute blocks. */
private void writeInlineBlocks(boolean closing) throws IOException {
for (InlineBlockWriter ibw : inlineBlockWriters) {
while (ibw.shouldWriteBlock(closing)) {
long offset = outputStream.getPos();
boolean cacheThisBlock = ibw.cacheOnWrite();
ibw.writeInlineBlock(fsBlockWriter.startWriting(
ibw.getInlineBlockType(), cacheThisBlock));
fsBlockWriter.writeHeaderAndData(outputStream);
ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
fsBlockWriter.getUncompressedSizeWithoutHeader());
if (cacheThisBlock) {
// Cache this block on write.
blockCache.cacheBlock(HFile.getBlockCacheKey(name, offset),
fsBlockWriter.getBlockForCaching());
}
}
}
}
/**
* Ready a new block for writing.
*
* @throws IOException
*/
private void newBlock() throws IOException {
// This is where the next block begins.
fsBlockWriter.startWriting(BlockType.DATA, cacheDataBlocksOnWrite);
firstKeyInBlock = null;
}
/**
* Add a meta block to the end of the file. Call before close(). Metadata
* blocks are expensive. Fill one with a bunch of serialized data rather than
* do a metadata block per metadata instance. If metadata is small, consider
* adding to file info using {@link #appendFileInfo(byte[], byte[])}
*
* @param metaBlockName
* name of the block
* @param content
* will call readFields to get data later (DO NOT REUSE)
*/
@Override
public void appendMetaBlock(String metaBlockName, Writable content) {
byte[] key = Bytes.toBytes(metaBlockName);
int i;
for (i = 0; i < metaNames.size(); ++i) {
// stop when the current key is greater than our own
byte[] cur = metaNames.get(i);
if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
key.length) > 0) {
break;
}
}
metaNames.add(i, key);
metaData.add(i, content);
}
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
*
* @param kv
* KeyValue to add. Cannot be empty nor null.
* @throws IOException
*/
@Override
public void append(final KeyValue kv) throws IOException {
append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
}
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
*
* @param key
* Key to add. Cannot be empty nor null.
* @param value
* Value to add. Cannot be empty nor null.
* @throws IOException
*/
@Override
public void append(final byte[] key, final byte[] value) throws IOException {
append(key, 0, key.length, value, 0, value.length);
}
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
*
* @param key
* @param koffset
* @param klength
* @param value
* @param voffset
* @param vlength
* @throws IOException
*/
private void append(final byte[] key, final int koffset, final int klength,
final byte[] value, final int voffset, final int vlength)
throws IOException {
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
if (!dupKey) {
checkBlockBoundary();
}
if (!fsBlockWriter.isWriting())
newBlock();
// Write length of key and value and then actual key and value bytes.
{
DataOutputStream out = fsBlockWriter.getUserDataStream();
out.writeInt(klength);
totalKeyLength += klength;
out.writeInt(vlength);
totalValueLength += vlength;
out.write(key, koffset, klength);
out.write(value, voffset, vlength);
}
// Are we the first key in this block?
if (firstKeyInBlock == null) {
// Copy the key.
firstKeyInBlock = new byte[klength];
System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
}
lastKeyBuffer = key;
lastKeyOffset = koffset;
lastKeyLength = klength;
entryCount++;
}
@Override
public void close() throws IOException {
if (outputStream == null) {
return;
}
// Write out the end of the data blocks, then write meta data blocks.
// followed by fileinfo, data block index and meta block index.
finishBlock();
writeInlineBlocks(true);
FixedFileTrailer trailer = new FixedFileTrailer(2);
// Write out the metadata blocks if any.
if (!metaNames.isEmpty()) {
for (int i = 0; i < metaNames.size(); ++i) {
// store the beginning offset
long offset = outputStream.getPos();
// write the metadata content
DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META,
cacheDataBlocksOnWrite);
metaData.get(i).write(dos);
fsBlockWriter.writeHeaderAndData(outputStream);
// Add the new meta block to the meta index.
metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
fsBlockWriter.getOnDiskSizeWithHeader());
}
}
// Load-on-open section.
// Data block index.
//
// In version 2, this section of the file starts with the root level data
// block index. We call a function that writes intermediate-level blocks
// first, then root level, and returns the offset of the root level block
// index.
long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
trailer.setLoadOnOpenOffset(rootIndexOffset);
// Meta block index.
metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
BlockType.ROOT_INDEX, false), "meta");
fsBlockWriter.writeHeaderAndData(outputStream);
// File info
writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO,
false));
fsBlockWriter.writeHeaderAndData(outputStream);
// Load-on-open data supplied by higher levels, e.g. Bloom filters.
for (BlockWritable w : additionalLoadOnOpenData)
fsBlockWriter.writeBlock(w, outputStream);
// Now finish off the trailer.
trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
trailer.setUncompressedDataIndexSize(
dataBlockIndexWriter.getTotalUncompressedSize());
trailer.setFirstDataBlockOffset(firstDataBlockOffset);
trailer.setLastDataBlockOffset(lastDataBlockOffset);
trailer.setComparatorClass(comparator.getClass());
trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
finishClose(trailer);
fsBlockWriter.releaseCompressor();
}
@Override
public void addInlineBlockWriter(InlineBlockWriter ibw) {
inlineBlockWriters.add(ibw);
if (blockCache == null && ibw.cacheOnWrite())
initBlockCache();
}
@Override
public void addBloomFilter(final BloomFilterWriter bfw) {
if (bfw.getKeyCount() <= 0)
return;
additionalLoadOnOpenData.add(new BlockWritable() {
@Override
public BlockType getBlockType() {
return BlockType.BLOOM_META;
}
@Override
public void writeToBlock(DataOutput out) throws IOException {
bfw.getMetaWriter().write(out);
Writable dataWriter = bfw.getDataWriter();
if (dataWriter != null)
dataWriter.write(out);
}
});
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.io.Writable;
/**
* Specifies methods needed to add elements to a Bloom filter and serialize the
* resulting Bloom filter as a sequence of bytes.
*/
public interface BloomFilterWriter extends BloomFilterBase {
/** Allocate memory for the bloom filter data. */
void allocBloom();
/** Compact the Bloom filter before writing metadata & data to disk. */
void compactBloom();
/**
* Get a writable interface into bloom filter meta data.
*
* @return a writable instance that can be later written to a stream
*/
Writable getMetaWriter();
/**
* Get a writable interface into bloom filter data (the actual Bloom bits).
* Not used for compound Bloom filters.
*
* @return a writable instance that can be later written to a stream
*/
Writable getDataWriter();
/**
* Add the specified binary to the bloom filter.
*
* @param buf data to be added to the bloom
* @param offset offset into the data to be added
* @param len length of the data to be added
*/
void add(byte[] buf, int offset, int len);
}