Revert "HBASE-Squash HFileReaderV3 together with HFileReaderV2 and AbstractHFileReader; ditto for Scanners and BlockReader, etc."

This reverts commit 5b25a48e7f.
This commit is contained in:
stack 2015-04-03 15:16:38 -07:00
parent 5b25a48e7f
commit 319666ca53
34 changed files with 1996 additions and 1498 deletions

View File

@ -24,8 +24,8 @@ import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl; import org.apache.hadoop.hbase.io.hfile.HFileReaderV3;
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.io.hfile.HFileWriterV3;
import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
@ -46,8 +46,8 @@ public class IntegrationTestIngestWithEncryption extends IntegrationTestIngest {
static { static {
// These log level changes are only useful when running on a localhost // These log level changes are only useful when running on a localhost
// cluster. // cluster.
Logger.getLogger(HFileReaderImpl.class).setLevel(Level.TRACE); Logger.getLogger(HFileReaderV3.class).setLevel(Level.TRACE);
Logger.getLogger(HFileWriterImpl.class).setLevel(Level.TRACE); Logger.getLogger(HFileWriterV3.class).setLevel(Level.TRACE);
Logger.getLogger(SecureProtobufLogReader.class).setLevel(Level.TRACE); Logger.getLogger(SecureProtobufLogReader.class).setLevel(Level.TRACE);
Logger.getLogger(SecureProtobufLogWriter.class).setLevel(Level.TRACE); Logger.getLogger(SecureProtobufLogWriter.class).setLevel(Level.TRACE);
} }

View File

@ -0,0 +1,352 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
/**
* Common functionality needed by all versions of {@link HFile} readers.
*/
@InterfaceAudience.Private
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
public abstract class AbstractHFileReader
implements HFile.Reader, Configurable {
/** Stream to read from. Does checksum verifications in file system */
protected FSDataInputStream istream; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD
/** The file system stream of the underlying {@link HFile} that
* does not do checksum verification in the file system */
protected FSDataInputStream istreamNoFsChecksum; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD
/** Data block index reader keeping the root data index in memory */
protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
/** Meta block index reader -- always single level */
protected HFileBlockIndex.BlockIndexReader metaBlockIndexReader;
protected final FixedFileTrailer trailer;
/** Filled when we read in the trailer. */
protected final Compression.Algorithm compressAlgo;
/**
* What kind of data block encoding should be used while reading, writing,
* and handling cache.
*/
protected HFileDataBlockEncoder dataBlockEncoder =
NoOpDataBlockEncoder.INSTANCE;
/** Last key in the file. Filled in when we read in the file info */
protected byte [] lastKey = null;
/** Average key length read from file info */
protected int avgKeyLen = -1;
/** Average value length read from file info */
protected int avgValueLen = -1;
/** Key comparator */
protected KVComparator comparator = new KVComparator();
/** Size of this file. */
protected final long fileSize;
/** Block cache configuration. */
protected final CacheConfig cacheConf;
/** Path of file */
protected final Path path;
/** File name to be used for block names */
protected final String name;
protected FileInfo fileInfo;
/** The filesystem used for accesing data */
protected HFileSystem hfs;
protected Configuration conf;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs,
final Configuration conf) {
this.trailer = trailer;
this.compressAlgo = trailer.getCompressionCodec();
this.cacheConf = cacheConf;
this.fileSize = fileSize;
this.path = path;
this.name = path.getName();
this.hfs = hfs; // URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD
this.conf = conf;
}
@SuppressWarnings("serial")
public static class BlockIndexNotLoadedException
extends IllegalStateException {
public BlockIndexNotLoadedException() {
// Add a message in case anyone relies on it as opposed to class name.
super("Block index not loaded");
}
}
protected String toStringFirstKey() {
return KeyValue.keyToString(getFirstKey());
}
protected String toStringLastKey() {
return KeyValue.keyToString(getLastKey());
}
public abstract boolean isFileInfoLoaded();
@Override
public String toString() {
return "reader=" + path.toString() +
(!isFileInfoLoaded()? "":
", compression=" + compressAlgo.getName() +
", cacheConf=" + cacheConf +
", firstKey=" + toStringFirstKey() +
", lastKey=" + toStringLastKey()) +
", avgKeyLen=" + avgKeyLen +
", avgValueLen=" + avgValueLen +
", entries=" + trailer.getEntryCount() +
", length=" + fileSize;
}
@Override
public long length() {
return fileSize;
}
/**
* Create a Scanner on this file. No seeks or reads are done on creation. Call
* {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
* nothing to clean up in a Scanner. Letting go of your references to the
* scanner is sufficient. NOTE: Do not use this overload of getScanner for
* compactions.
*
* @param cacheBlocks True if we should cache blocks read in by this scanner.
* @param pread Use positional read rather than seek+read if true (pread is
* better for random reads, seek+read is better scanning).
* @return Scanner on this file.
*/
@Override
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
return getScanner(cacheBlocks, pread, false);
}
/**
* @return the first key in the file. May be null if file has no entries. Note
* that this is not the first row key, but rather the byte form of the
* first KeyValue.
*/
@Override
public byte [] getFirstKey() {
if (dataBlockIndexReader == null) {
throw new BlockIndexNotLoadedException();
}
return dataBlockIndexReader.isEmpty() ? null
: dataBlockIndexReader.getRootBlockKey(0);
}
/**
* TODO left from {@link HFile} version 1: move this to StoreFile after Ryan's
* patch goes in to eliminate {@link KeyValue} here.
*
* @return the first row key, or null if the file is empty.
*/
@Override
public byte[] getFirstRowKey() {
byte[] firstKey = getFirstKey();
if (firstKey == null)
return null;
return KeyValue.createKeyValueFromKey(firstKey).getRow();
}
/**
* TODO left from {@link HFile} version 1: move this to StoreFile after
* Ryan's patch goes in to eliminate {@link KeyValue} here.
*
* @return the last row key, or null if the file is empty.
*/
@Override
public byte[] getLastRowKey() {
byte[] lastKey = getLastKey();
if (lastKey == null)
return null;
return KeyValue.createKeyValueFromKey(lastKey).getRow();
}
/** @return number of KV entries in this HFile */
@Override
public long getEntries() {
return trailer.getEntryCount();
}
/** @return comparator */
@Override
public KVComparator getComparator() {
return comparator;
}
/** @return compression algorithm */
@Override
public Compression.Algorithm getCompressionAlgorithm() {
return compressAlgo;
}
/**
* @return the total heap size of data and meta block indexes in bytes. Does
* not take into account non-root blocks of a multilevel data index.
*/
public long indexSize() {
return (dataBlockIndexReader != null ? dataBlockIndexReader.heapSize() : 0)
+ ((metaBlockIndexReader != null) ? metaBlockIndexReader.heapSize()
: 0);
}
@Override
public String getName() {
return name;
}
@Override
public HFileBlockIndex.BlockIndexReader getDataBlockIndexReader() {
return dataBlockIndexReader;
}
@Override
public FixedFileTrailer getTrailer() {
return trailer;
}
@Override
public FileInfo loadFileInfo() throws IOException {
return fileInfo;
}
/**
* An exception thrown when an operation requiring a scanner to be seeked
* is invoked on a scanner that is not seeked.
*/
@SuppressWarnings("serial")
public static class NotSeekedException extends IllegalStateException {
public NotSeekedException() {
super("Not seeked to a key/value");
}
}
protected static abstract class Scanner implements HFileScanner {
protected ByteBuffer blockBuffer;
protected boolean cacheBlocks;
protected final boolean pread;
protected final boolean isCompaction;
protected int currKeyLen;
protected int currValueLen;
protected int currMemstoreTSLen;
protected long currMemstoreTS;
protected int blockFetches;
protected final HFile.Reader reader;
public Scanner(final HFile.Reader reader, final boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
this.reader = reader;
this.cacheBlocks = cacheBlocks;
this.pread = pread;
this.isCompaction = isCompaction;
}
@Override
public boolean isSeeked(){
return blockBuffer != null;
}
@Override
public String toString() {
return "HFileScanner for reader " + String.valueOf(getReader());
}
protected void assertSeeked() {
if (!isSeeked())
throw new NotSeekedException();
}
@Override
public int seekTo(byte[] key) throws IOException {
return seekTo(key, 0, key.length);
}
@Override
public boolean seekBefore(byte[] key) throws IOException {
return seekBefore(key, 0, key.length);
}
@Override
public int reseekTo(byte[] key) throws IOException {
return reseekTo(key, 0, key.length);
}
@Override
public HFile.Reader getReader() {
return reader;
}
}
/** For testing */
abstract HFileBlock.FSReader getUncachedBlockReader();
public Path getPath() {
return path;
}
@Override
public DataBlockEncoding getDataBlockEncoding() {
return dataBlockEncoder.getDataBlockEncoding();
}
public abstract int getMajorVersion();
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
}

View File

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.Writable;
/**
* Common functionality needed by all versions of {@link HFile} writers.
*/
@InterfaceAudience.Private
public abstract class AbstractHFileWriter implements HFile.Writer {
/** The Cell previously appended. Becomes the last cell in the file.*/
protected Cell lastCell = null;
/** FileSystem stream to write into. */
protected FSDataOutputStream outputStream;
/** True if we opened the <code>outputStream</code> (and so will close it). */
protected final boolean closeOutputStream;
/** A "file info" block: a key-value map of file-wide metadata. */
protected FileInfo fileInfo = new HFile.FileInfo();
/** Total # of key/value entries, i.e. how many times add() was called. */
protected long entryCount = 0;
/** Used for calculating the average key length. */
protected long totalKeyLength = 0;
/** Used for calculating the average value length. */
protected long totalValueLength = 0;
/** Total uncompressed bytes, maybe calculate a compression ratio later. */
protected long totalUncompressedBytes = 0;
/** Key comparator. Used to ensure we write in order. */
protected final KVComparator comparator;
/** Meta block names. */
protected List<byte[]> metaNames = new ArrayList<byte[]>();
/** {@link Writable}s representing meta block data. */
protected List<Writable> metaData = new ArrayList<Writable>();
/**
* First cell in a block.
* This reference should be short-lived since we write hfiles in a burst.
*/
protected Cell firstCellInBlock = null;
/** May be null if we were passed a stream. */
protected final Path path;
/** Cache configuration for caching data on write. */
protected final CacheConfig cacheConf;
/**
* Name for this object used when logging or in toString. Is either
* the result of a toString on stream or else name of passed file Path.
*/
protected final String name;
/**
* The data block encoding which will be used.
* {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
*/
protected final HFileDataBlockEncoder blockEncoder;
protected final HFileContext hFileContext;
public AbstractHFileWriter(CacheConfig cacheConf,
FSDataOutputStream outputStream, Path path,
KVComparator comparator, HFileContext fileContext) {
this.outputStream = outputStream;
this.path = path;
this.name = path != null ? path.getName() : outputStream.toString();
this.hFileContext = fileContext;
DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
if (encoding != DataBlockEncoding.NONE) {
this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
} else {
this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
}
this.comparator = comparator != null ? comparator
: KeyValue.COMPARATOR;
closeOutputStream = path != null;
this.cacheConf = cacheConf;
}
/**
* Add last bits of metadata to file info before it is written out.
*/
protected void finishFileInfo() throws IOException {
if (lastCell != null) {
// Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
// byte buffer. Won't take a tuple.
byte [] lastKey = CellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
fileInfo.append(FileInfo.LASTKEY, lastKey, false);
}
// Average key length.
int avgKeyLen =
entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
// Average value length.
int avgValueLen =
entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
false);
}
/**
* Add to the file info. All added key/value pairs can be obtained using
* {@link HFile.Reader#loadFileInfo()}.
*
* @param k Key
* @param v Value
* @throws IOException in case the key or the value are invalid
*/
@Override
public void appendFileInfo(final byte[] k, final byte[] v)
throws IOException {
fileInfo.append(k, v, true);
}
/**
* Sets the file info offset in the trailer, finishes up populating fields in
* the file info, and writes the file info into the given data output. The
* reason the data output is not always {@link #outputStream} is that we store
* file info as a block in version 2.
*
* @param trailer fixed file trailer
* @param out the data output to write the file info to
* @throws IOException
*/
protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
throws IOException {
trailer.setFileInfoOffset(outputStream.getPos());
finishFileInfo();
fileInfo.write(out);
}
/**
* Checks that the given Cell's key does not violate the key order.
*
* @param cell Cell whose key to check.
* @return true if the key is duplicate
* @throws IOException if the key or the key order is wrong
*/
protected boolean checkKey(final Cell cell) throws IOException {
boolean isDuplicateKey = false;
if (cell == null) {
throw new IOException("Key cannot be null or empty");
}
if (lastCell != null) {
int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell);
if (keyComp > 0) {
throw new IOException("Added a key not lexically larger than"
+ " previous. Current cell = " + cell + ", lastCell = " + lastCell);
} else if (keyComp == 0) {
isDuplicateKey = true;
}
}
return isDuplicateKey;
}
/** Checks the given value for validity. */
protected void checkValue(final byte[] value, final int offset,
final int length) throws IOException {
if (value == null) {
throw new IOException("Value cannot be null");
}
}
/**
* @return Path or null if we were passed a stream rather than a Path.
*/
@Override
public Path getPath() {
return path;
}
@Override
public String toString() {
return "writer=" + (path != null ? path.toString() : null) + ", name="
+ name + ", compression=" + hFileContext.getCompression().getName();
}
/**
* Sets remaining trailer fields, writes the trailer to disk, and optionally
* closes the output stream.
*/
protected void finishClose(FixedFileTrailer trailer) throws IOException {
trailer.setMetaIndexCount(metaNames.size());
trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
trailer.setEntryCount(entryCount);
trailer.setCompressionCodec(hFileContext.getCompression());
trailer.serialize(outputStream);
if (closeOutputStream) {
outputStream.close();
outputStream = null;
}
}
public static Compression.Algorithm compressionByName(String algoName) {
if (algoName == null)
return HFile.DEFAULT_COMPRESSION_ALGORITHM;
return Compression.getCompressionAlgorithmByName(algoName);
}
/** A helper method to create HFile output streams in constructors */
protected static FSDataOutputStream createOutputStream(Configuration conf,
FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
FsPermission perms = FSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY);
return FSUtils.create(fs, path, perms, favoredNodes);
}
}

View File

@ -238,7 +238,7 @@ public class FixedFileTrailer {
BlockType.TRAILER.readAndCheck(inputStream); BlockType.TRAILER.readAndCheck(inputStream);
if (majorVersion > 2 if (majorVersion > 2
|| (majorVersion == 2 && minorVersion >= HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION)) { || (majorVersion == 2 && minorVersion >= HFileReaderV2.PBUF_TRAILER_MINOR_VERSION)) {
deserializeFromPB(inputStream); deserializeFromPB(inputStream);
} else { } else {
deserializeFromWritable(inputStream); deserializeFromWritable(inputStream);
@ -611,9 +611,7 @@ public class FixedFileTrailer {
} }
public byte[] getEncryptionKey() { public byte[] getEncryptionKey() {
// This is a v3 feature but if reading a v2 file the encryptionKey will just be null which expectAtLeastMajorVersion(3);
// if fine for this feature.
expectAtLeastMajorVersion(2);
return encryptionKey; return encryptionKey;
} }

View File

@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/** /**
@ -198,8 +197,6 @@ public class HFile {
/** API required to write an {@link HFile} */ /** API required to write an {@link HFile} */
public interface Writer extends Closeable { public interface Writer extends Closeable {
/** Max memstore (mvcc) timestamp in FileInfo */
public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
/** Add an element to the file info map. */ /** Add an element to the file info map. */
void appendFileInfo(byte[] key, byte[] value) throws IOException; void appendFileInfo(byte[] key, byte[] value) throws IOException;
@ -297,7 +294,7 @@ public class HFile {
"filesystem/path or path"); "filesystem/path or path");
} }
if (path != null) { if (path != null) {
ostream = HFileWriterImpl.createOutputStream(conf, fs, path, favoredNodes); ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
} }
return createWriter(fs, path, ostream, return createWriter(fs, path, ostream,
comparator, fileContext); comparator, fileContext);
@ -336,12 +333,9 @@ public class HFile {
int version = getFormatVersion(conf); int version = getFormatVersion(conf);
switch (version) { switch (version) {
case 2: case 2:
throw new IllegalArgumentException("This should never happen. " + return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
"Did you change hfile.format.version to read v2? This version of the software writes v3" +
" hfiles only (but it can read v2 files without having to update hfile.format.version " +
"in hbase-site.xml)");
case 3: case 3:
return new HFileWriterFactory(conf, cacheConf); return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
default: default:
throw new IllegalArgumentException("Cannot create writer for HFile " + throw new IllegalArgumentException("Cannot create writer for HFile " +
"format version " + version); "format version " + version);
@ -446,18 +440,6 @@ public class HFile {
* Return the file context of the HFile this reader belongs to * Return the file context of the HFile this reader belongs to
*/ */
HFileContext getFileContext(); HFileContext getFileContext();
boolean shouldIncludeMemstoreTS();
boolean isDecodeMemstoreTS();
DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction);
@VisibleForTesting
HFileBlock.FSReader getUncachedBlockReader();
@VisibleForTesting
boolean prefetchComplete();
} }
/** /**
@ -481,10 +463,9 @@ public class HFile {
trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size); trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
switch (trailer.getMajorVersion()) { switch (trailer.getMajorVersion()) {
case 2: case 2:
LOG.debug("Opening HFile v2 with v3 reader"); return new HFileReaderV2(path, trailer, fsdis, size, cacheConf, hfs, conf);
// Fall through.
case 3 : case 3 :
return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, conf); return new HFileReaderV3(path, trailer, fsdis, size, cacheConf, hfs, conf);
default: default:
throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion()); throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
} }
@ -508,7 +489,6 @@ public class HFile {
* @return A version specific Hfile Reader * @return A version specific Hfile Reader
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
*/ */
@SuppressWarnings("resource")
public static Reader createReader(FileSystem fs, Path path, public static Reader createReader(FileSystem fs, Path path,
FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf) FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf)
throws IOException { throws IOException {
@ -874,18 +854,6 @@ public class HFile {
} }
} }
public static void checkHFileVersion(final Configuration c) {
int version = c.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
if (version < MAX_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
throw new IllegalArgumentException("The setting for " + FORMAT_VERSION_KEY +
" (in your hbase-*.xml files) is " + version + " which does not match " +
MAX_FORMAT_VERSION +
"; are you running with a configuration from an older or newer hbase install (an " +
"incompatible hbase-default.xml or hbase-site.xml on your CLASSPATH)?");
}
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// delegate to preserve old behavior // delegate to preserve old behavior
HFilePrettyPrinter.main(args); HFilePrettyPrinter.main(args);

View File

@ -1256,40 +1256,13 @@ public class HFileBlock implements Cacheable {
/** Get the default decoder for blocks from this file. */ /** Get the default decoder for blocks from this file. */
HFileBlockDecodingContext getDefaultBlockDecodingContext(); HFileBlockDecodingContext getDefaultBlockDecodingContext();
void setIncludesMemstoreTS(boolean includesMemstoreTS);
void setDataBlockEncoder(HFileDataBlockEncoder encoder);
} }
/** /**
* We always prefetch the header of the next block, so that we know its * A common implementation of some methods of {@link FSReader} and some
* on-disk size in advance and can read it in one operation. * tools for implementing HFile format version-specific block readers.
*/ */
private static class PrefetchedHeader { private abstract static class AbstractFSReader implements FSReader {
long offset = -1;
byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
}
/** Reads version 2 blocks from the filesystem. */
static class FSReaderImpl implements FSReader {
/** The file system stream of the underlying {@link HFile} that
* does or doesn't do checksum validations in the filesystem */
protected FSDataInputStreamWrapper streamWrapper;
private HFileBlockDecodingContext encodedBlockDecodingCtx;
/** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
new ThreadLocal<PrefetchedHeader>() {
@Override
public PrefetchedHeader initialValue() {
return new PrefetchedHeader();
}
};
/** Compression algorithm used by the {@link HFile} */ /** Compression algorithm used by the {@link HFile} */
/** The size of the file we are reading from, or -1 if unknown. */ /** The size of the file we are reading from, or -1 if unknown. */
@ -1311,31 +1284,18 @@ public class HFileBlock implements Cacheable {
protected HFileContext fileContext; protected HFileContext fileContext;
public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
HFileContext fileContext) throws IOException { throws IOException {
this.fileSize = fileSize; this.fileSize = fileSize;
this.hfs = hfs; this.hfs = hfs;
this.path = path; this.path = path;
this.fileContext = fileContext; this.fileContext = fileContext;
this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
this.streamWrapper = stream;
// Older versions of HBase didn't support checksum.
this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
encodedBlockDecodingCtx = defaultDecodingCtx;
} }
/** @Override
* A constructor that reads files with the latest minor version. public BlockIterator blockRange(final long startOffset,
* This is used by unit tests only. final long endOffset) {
*/
FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
throws IOException {
this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
}
public BlockIterator blockRange(final long startOffset, final long endOffset) {
final FSReader owner = this; // handle for inner class final FSReader owner = this; // handle for inner class
return new BlockIterator() { return new BlockIterator() {
private long offset = startOffset; private long offset = startOffset;
@ -1432,6 +1392,56 @@ public class HFileBlock implements Cacheable {
return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize; return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
} }
}
/**
* We always prefetch the header of the next block, so that we know its
* on-disk size in advance and can read it in one operation.
*/
private static class PrefetchedHeader {
long offset = -1;
byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
}
/** Reads version 2 blocks from the filesystem. */
static class FSReaderImpl extends AbstractFSReader {
/** The file system stream of the underlying {@link HFile} that
* does or doesn't do checksum validations in the filesystem */
protected FSDataInputStreamWrapper streamWrapper;
private HFileBlockDecodingContext encodedBlockDecodingCtx;
/** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
new ThreadLocal<PrefetchedHeader>() {
@Override
public PrefetchedHeader initialValue() {
return new PrefetchedHeader();
}
};
public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
HFileContext fileContext) throws IOException {
super(fileSize, hfs, path, fileContext);
this.streamWrapper = stream;
// Older versions of HBase didn't support checksum.
this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
encodedBlockDecodingCtx = defaultDecodingCtx;
}
/**
* A constructor that reads files with the latest minor version.
* This is used by unit tests only.
*/
FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
throws IOException {
this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
}
/** /**
* Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
* little memory allocation as possible, using the provided on-disk size. * little memory allocation as possible, using the provided on-disk size.
@ -1672,11 +1682,11 @@ public class HFileBlock implements Cacheable {
return b; return b;
} }
public void setIncludesMemstoreTS(boolean includesMemstoreTS) { void setIncludesMemstoreTS(boolean includesMemstoreTS) {
this.fileContext.setIncludesMvcc(includesMemstoreTS); this.fileContext.setIncludesMvcc(includesMemstoreTS);
} }
public void setDataBlockEncoder(HFileDataBlockEncoder encoder) { void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
} }

View File

@ -55,8 +55,8 @@ import org.apache.hadoop.util.StringUtils;
* *
* Examples of how to use the block index writer can be found in * Examples of how to use the block index writer can be found in
* {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter} and * {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter} and
* {@link HFileWriterImpl}. Examples of how to use the reader can be * {@link HFileWriterV2}. Examples of how to use the reader can be
* found in {@link HFileWriterImpl} and TestHFileBlockIndex. * found in {@link HFileReaderV2} and TestHFileBlockIndex.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class HFileBlockIndex { public class HFileBlockIndex {

View File

@ -0,0 +1,358 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.security.Key;
import java.security.KeyException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
/**
* {@link HFile} reader for version 3.
*/
@InterfaceAudience.Private
public class HFileReaderV3 extends HFileReaderV2 {
private static final Log LOG = LogFactory.getLog(HFileReaderV3.class);
public static final int MAX_MINOR_VERSION = 0;
/**
* Opens a HFile. You must load the index before you can use it by calling
* {@link #loadFileInfo()}.
* @param path
* Path to HFile.
* @param trailer
* File trailer.
* @param fsdis
* input stream.
* @param size
* Length of the stream.
* @param cacheConf
* Cache configuration.
* @param hfs
* The file system.
* @param conf
* Configuration
*/
public HFileReaderV3(final Path path, FixedFileTrailer trailer, final FSDataInputStreamWrapper fsdis,
final long size, final CacheConfig cacheConf, final HFileSystem hfs,
final Configuration conf) throws IOException {
super(path, trailer, fsdis, size, cacheConf, hfs, conf);
byte[] tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
// max tag length is not present in the HFile means tags were not at all written to file.
if (tmp != null) {
hfileContext.setIncludesTags(true);
tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED);
if (tmp != null && Bytes.toBoolean(tmp)) {
hfileContext.setCompressTags(true);
}
}
}
@Override
protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
trailer.expectMajorVersion(3);
HFileContextBuilder builder = new HFileContextBuilder()
.withIncludesMvcc(this.includesMemstoreTS)
.withHBaseCheckSum(true)
.withCompression(this.compressAlgo);
// Check for any key material available
byte[] keyBytes = trailer.getEncryptionKey();
if (keyBytes != null) {
Encryption.Context cryptoContext = Encryption.newContext(conf);
Key key;
String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
User.getCurrent().getShortName());
try {
// First try the master key
key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
} catch (KeyException e) {
// If the current master key fails to unwrap, try the alternate, if
// one is configured
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
}
String alternateKeyName =
conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
if (alternateKeyName != null) {
try {
key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
} catch (KeyException ex) {
throw new IOException(ex);
}
} else {
throw new IOException(e);
}
}
// Use the algorithm the key wants
Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
if (cipher == null) {
throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
}
cryptoContext.setCipher(cipher);
cryptoContext.setKey(key);
builder.withEncryptionContext(cryptoContext);
}
HFileContext context = builder.build();
if (LOG.isTraceEnabled()) {
LOG.trace("Reader" + (path != null ? " for " + path : "" ) +
" initialized with cacheConf: " + cacheConf +
" comparator: " + comparator.getClass().getSimpleName() +
" fileContext: " + context);
}
return context;
}
/**
* Create a Scanner on this file. No seeks or reads are done on creation. Call
* {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
* nothing to clean up in a Scanner. Letting go of your references to the
* scanner is sufficient.
* @param cacheBlocks
* True if we should cache blocks read in by this scanner.
* @param pread
* Use positional read rather than seek+read if true (pread is better
* for random reads, seek+read is better scanning).
* @param isCompaction
* is scanner being used for a compaction?
* @return Scanner on this file.
*/
@Override
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
if (dataBlockEncoder.useEncodedScanner()) {
return new EncodedScannerV3(this, cacheBlocks, pread, isCompaction, this.hfileContext);
}
return new ScannerV3(this, cacheBlocks, pread, isCompaction);
}
/**
* Implementation of {@link HFileScanner} interface.
*/
protected static class ScannerV3 extends ScannerV2 {
private HFileReaderV3 reader;
private int currTagsLen;
public ScannerV3(HFileReaderV3 r, boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
super(r, cacheBlocks, pread, isCompaction);
this.reader = r;
}
@Override
protected int getCellBufSize() {
int kvBufSize = super.getCellBufSize();
if (reader.hfileContext.isIncludesTags()) {
kvBufSize += Bytes.SIZEOF_SHORT + currTagsLen;
}
return kvBufSize;
}
protected void setNonSeekedState() {
super.setNonSeekedState();
currTagsLen = 0;
}
@Override
protected int getNextCellStartPosition() {
int nextKvPos = super.getNextCellStartPosition();
if (reader.hfileContext.isIncludesTags()) {
nextKvPos += Bytes.SIZEOF_SHORT + currTagsLen;
}
return nextKvPos;
}
protected void readKeyValueLen() {
blockBuffer.mark();
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit()
|| currValueLen > blockBuffer.limit()) {
throw new IllegalStateException("Invalid currKeyLen " + currKeyLen + " or currValueLen "
+ currValueLen + ". Block offset: "
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ blockBuffer.position() + " (without header).");
}
ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
if (reader.hfileContext.isIncludesTags()) {
// Read short as unsigned, high byte first
currTagsLen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
if (currTagsLen < 0 || currTagsLen > blockBuffer.limit()) {
throw new IllegalStateException("Invalid currTagsLen " + currTagsLen + ". Block offset: "
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ blockBuffer.position() + " (without header).");
}
ByteBufferUtils.skip(blockBuffer, currTagsLen);
}
readMvccVersion();
blockBuffer.reset();
}
/**
* Within a loaded block, seek looking for the last key that is smaller than
* (or equal to?) the key we are interested in.
* A note on the seekBefore: if you have seekBefore = true, AND the first
* key in the block = key, then you'll get thrown exceptions. The caller has
* to check for that case and load the previous block as appropriate.
* @param key
* the key to find
* @param seekBefore
* find the key before the given key in case of exact match.
* @return 0 in case of an exact key match, 1 in case of an inexact match,
* -2 in case of an inexact match and furthermore, the input key
* less than the first key of current block(e.g. using a faked index
* key)
*/
@Override
protected int blockSeek(Cell key, boolean seekBefore) {
int klen, vlen, tlen = 0;
long memstoreTS = 0;
int memstoreTSLen = 0;
int lastKeyValueSize = -1;
KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue();
do {
blockBuffer.mark();
klen = blockBuffer.getInt();
vlen = blockBuffer.getInt();
if (klen < 0 || vlen < 0 || klen > blockBuffer.limit()
|| vlen > blockBuffer.limit()) {
throw new IllegalStateException("Invalid klen " + klen + " or vlen "
+ vlen + ". Block offset: "
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ blockBuffer.position() + " (without header).");
}
ByteBufferUtils.skip(blockBuffer, klen + vlen);
if (reader.hfileContext.isIncludesTags()) {
// Read short as unsigned, high byte first
tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
if (tlen < 0 || tlen > blockBuffer.limit()) {
throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: "
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ blockBuffer.position() + " (without header).");
}
ByteBufferUtils.skip(blockBuffer, tlen);
}
if (this.reader.shouldIncludeMemstoreTS()) {
if (this.reader.decodeMemstoreTS) {
try {
memstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position());
memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
} catch (Exception e) {
throw new RuntimeException("Error reading memstore timestamp", e);
}
} else {
memstoreTS = 0;
memstoreTSLen = 1;
}
}
blockBuffer.reset();
int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + (Bytes.SIZEOF_INT * 2);
keyOnlyKv.setKey(blockBuffer.array(), keyOffset, klen);
int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlyKv);
if (comp == 0) {
if (seekBefore) {
if (lastKeyValueSize < 0) {
throw new IllegalStateException("blockSeek with seekBefore "
+ "at the first key of the block: key="
+ CellUtil.getCellKeyAsString(key)
+ ", blockOffset=" + block.getOffset() + ", onDiskSize="
+ block.getOnDiskSizeWithHeader());
}
blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
readKeyValueLen();
return 1; // non exact match.
}
currKeyLen = klen;
currValueLen = vlen;
currTagsLen = tlen;
if (this.reader.shouldIncludeMemstoreTS()) {
currMemstoreTS = memstoreTS;
currMemstoreTSLen = memstoreTSLen;
}
return 0; // indicate exact match
} else if (comp < 0) {
if (lastKeyValueSize > 0)
blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
readKeyValueLen();
if (lastKeyValueSize == -1 && blockBuffer.position() == 0) {
return HConstants.INDEX_KEY_MAGIC;
}
return 1;
}
// The size of this key/value tuple, including key/value length fields.
lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
// include tag length also if tags included with KV
if (reader.hfileContext.isIncludesTags()) {
lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT;
}
blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
} while (blockBuffer.remaining() > 0);
// Seek to the last key we successfully read. This will happen if this is
// the last key/value pair in the file, in which case the following call
// to next() has to return false.
blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
readKeyValueLen();
return 1; // didn't exactly find it.
}
}
/**
* ScannerV3 that operates on encoded data blocks.
*/
protected static class EncodedScannerV3 extends EncodedScannerV2 {
public EncodedScannerV3(HFileReaderV3 reader, boolean cacheBlocks, boolean pread,
boolean isCompaction, HFileContext context) {
super(reader, cacheBlocks, pread, isCompaction, context);
}
}
@Override
public int getMajorVersion() {
return 3;
}
}

View File

@ -1,40 +0,0 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitationsME
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
public class HFileWriterFactory extends HFile.WriterFactory {
HFileWriterFactory(Configuration conf, CacheConfig cacheConf) {
super(conf, cacheConf);
}
@Override
public HFile.Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream,
KVComparator comparator, HFileContext context)
throws IOException {
return new HFileWriterImpl(conf, cacheConf, path, ostream, comparator, context);
}
}

View File

@ -1,4 +1,5 @@
/* /*
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -21,7 +22,6 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -31,105 +31,40 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
/** /**
* Common functionality needed by all versions of {@link HFile} writers. * Writes HFile format version 2.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class HFileWriterImpl implements HFile.Writer { public class HFileWriterV2 extends AbstractHFileWriter {
private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class); static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
/** The Cell previously appended. Becomes the last cell in the file.*/ /** Max memstore (mvcc) timestamp in FileInfo */
protected Cell lastCell = null; public static final byte [] MAX_MEMSTORE_TS_KEY =
Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
/** FileSystem stream to write into. */
protected FSDataOutputStream outputStream;
/** True if we opened the <code>outputStream</code> (and so will close it). */
protected final boolean closeOutputStream;
/** A "file info" block: a key-value map of file-wide metadata. */
protected FileInfo fileInfo = new HFile.FileInfo();
/** Total # of key/value entries, i.e. how many times add() was called. */
protected long entryCount = 0;
/** Used for calculating the average key length. */
protected long totalKeyLength = 0;
/** Used for calculating the average value length. */
protected long totalValueLength = 0;
/** Total uncompressed bytes, maybe calculate a compression ratio later. */
protected long totalUncompressedBytes = 0;
/** Key comparator. Used to ensure we write in order. */
protected final KVComparator comparator;
/** Meta block names. */
protected List<byte[]> metaNames = new ArrayList<byte[]>();
/** {@link Writable}s representing meta block data. */
protected List<Writable> metaData = new ArrayList<Writable>();
/**
* First cell in a block.
* This reference should be short-lived since we write hfiles in a burst.
*/
protected Cell firstCellInBlock = null;
/** May be null if we were passed a stream. */
protected final Path path;
/** Cache configuration for caching data on write. */
protected final CacheConfig cacheConf;
/**
* Name for this object used when logging or in toString. Is either
* the result of a toString on stream or else name of passed file Path.
*/
protected final String name;
/**
* The data block encoding which will be used.
* {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
*/
protected final HFileDataBlockEncoder blockEncoder;
protected final HFileContext hFileContext;
private int maxTagsLength = 0;
/** KeyValue version in FileInfo */ /** KeyValue version in FileInfo */
public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); public static final byte [] KEY_VALUE_VERSION =
Bytes.toBytes("KEY_VALUE_VERSION");
/** Version for KeyValue which includes memstore timestamp */ /** Version for KeyValue which includes memstore timestamp */
public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
/** Inline block writers for multi-level block index and compound Blooms. */ /** Inline block writers for multi-level block index and compound Blooms. */
private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<InlineBlockWriter>(); private List<InlineBlockWriter> inlineBlockWriters =
new ArrayList<InlineBlockWriter>();
/** block writer */ /** Unified version 2 block writer */
protected HFileBlock.Writer fsBlockWriter; protected HFileBlock.Writer fsBlockWriter;
private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
@ -148,135 +83,40 @@ public class HFileWriterImpl implements HFile.Writer {
private Cell lastCellOfPreviousBlock = null; private Cell lastCellOfPreviousBlock = null;
/** Additional data items to be written to the "load-on-open" section. */ /** Additional data items to be written to the "load-on-open" section. */
private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<BlockWritable>(); private List<BlockWritable> additionalLoadOnOpenData =
new ArrayList<BlockWritable>();
protected long maxMemstoreTS = 0; protected long maxMemstoreTS = 0;
public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, static class WriterFactoryV2 extends HFile.WriterFactory {
FSDataOutputStream outputStream, WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
KVComparator comparator, HFileContext fileContext) { super(conf, cacheConf);
this.outputStream = outputStream;
this.path = path;
this.name = path != null ? path.getName() : outputStream.toString();
this.hFileContext = fileContext;
DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
if (encoding != DataBlockEncoding.NONE) {
this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
} else {
this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
} }
this.comparator = comparator != null ? comparator
: KeyValue.COMPARATOR;
closeOutputStream = path != null; @Override
this.cacheConf = cacheConf; public Writer createWriter(FileSystem fs, Path path,
finishInit(conf); FSDataOutputStream ostream,
if (LOG.isTraceEnabled()) { KVComparator comparator, HFileContext context) throws IOException {
LOG.trace("Writer" + (path != null ? " for " + path : "") + context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
" initialized with cacheConf: " + cacheConf + return new HFileWriterV2(conf, cacheConf, fs, path, ostream,
" comparator: " + comparator.getClass().getSimpleName() + comparator, context);
" fileContext: " + fileContext);
}
}
/**
* Add to the file info. All added key/value pairs can be obtained using
* {@link HFile.Reader#loadFileInfo()}.
*
* @param k Key
* @param v Value
* @throws IOException in case the key or the value are invalid
*/
@Override
public void appendFileInfo(final byte[] k, final byte[] v)
throws IOException {
fileInfo.append(k, v, true);
}
/**
* Sets the file info offset in the trailer, finishes up populating fields in
* the file info, and writes the file info into the given data output. The
* reason the data output is not always {@link #outputStream} is that we store
* file info as a block in version 2.
*
* @param trailer fixed file trailer
* @param out the data output to write the file info to
* @throws IOException
*/
protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
throws IOException {
trailer.setFileInfoOffset(outputStream.getPos());
finishFileInfo();
fileInfo.write(out);
}
/**
* Checks that the given Cell's key does not violate the key order.
*
* @param cell Cell whose key to check.
* @return true if the key is duplicate
* @throws IOException if the key or the key order is wrong
*/
protected boolean checkKey(final Cell cell) throws IOException {
boolean isDuplicateKey = false;
if (cell == null) {
throw new IOException("Key cannot be null or empty");
}
if (lastCell != null) {
int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell);
if (keyComp > 0) {
throw new IOException("Added a key not lexically larger than"
+ " previous. Current cell = " + cell + ", lastCell = " + lastCell);
} else if (keyComp == 0) {
isDuplicateKey = true;
} }
} }
return isDuplicateKey;
}
/** Checks the given value for validity. */ /** Constructor that takes a path, creates and closes the output stream. */
protected void checkValue(final byte[] value, final int offset, public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
final int length) throws IOException { FileSystem fs, Path path, FSDataOutputStream ostream,
if (value == null) { final KVComparator comparator, final HFileContext context) throws IOException {
throw new IOException("Value cannot be null"); super(cacheConf,
} ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
} path, comparator, context);
finishInit(conf);
/**
* @return Path or null if we were passed a stream rather than a Path.
*/
@Override
public Path getPath() {
return path;
}
@Override
public String toString() {
return "writer=" + (path != null ? path.toString() : null) + ", name="
+ name + ", compression=" + hFileContext.getCompression().getName();
}
public static Compression.Algorithm compressionByName(String algoName) {
if (algoName == null)
return HFile.DEFAULT_COMPRESSION_ALGORITHM;
return Compression.getCompressionAlgorithmByName(algoName);
}
/** A helper method to create HFile output streams in constructors */
protected static FSDataOutputStream createOutputStream(Configuration conf,
FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
FsPermission perms = FSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY);
return FSUtils.create(fs, path, perms, favoredNodes);
} }
/** Additional initialization steps */ /** Additional initialization steps */
protected void finishInit(final Configuration conf) { protected void finishInit(final Configuration conf) {
if (fsBlockWriter != null) { if (fsBlockWriter != null)
throw new IllegalStateException("finishInit called twice"); throw new IllegalStateException("finishInit called twice");
}
fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext); fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
@ -300,7 +140,9 @@ public class HFileWriterImpl implements HFile.Writer {
* @throws IOException * @throws IOException
*/ */
protected void checkBlockBoundary() throws IOException { protected void checkBlockBoundary() throws IOException {
if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize()) return; if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
return;
finishBlock(); finishBlock();
writeInlineBlocks(false); writeInlineBlocks(false);
newBlock(); newBlock();
@ -308,7 +150,8 @@ public class HFileWriterImpl implements HFile.Writer {
/** Clean up the current data block */ /** Clean up the current data block */
private void finishBlock() throws IOException { private void finishBlock() throws IOException {
if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return; if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
return;
// Update the first data block offset for scanning. // Update the first data block offset for scanning.
if (firstDataBlockOffset == -1) { if (firstDataBlockOffset == -1) {
@ -318,6 +161,7 @@ public class HFileWriterImpl implements HFile.Writer {
lastDataBlockOffset = outputStream.getPos(); lastDataBlockOffset = outputStream.getPos();
fsBlockWriter.writeHeaderAndData(outputStream); fsBlockWriter.writeHeaderAndData(outputStream);
int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader(); int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
Cell indexEntry = Cell indexEntry =
CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock); CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
@ -355,7 +199,8 @@ public class HFileWriterImpl implements HFile.Writer {
*/ */
private void doCacheOnWrite(long offset) { private void doCacheOnWrite(long offset) {
HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf); HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(name, offset), cacheFormatBlock); cacheConf.getBlockCache().cacheBlock(
new BlockCacheKey(name, offset), cacheFormatBlock);
} }
/** /**
@ -399,6 +244,48 @@ public class HFileWriterImpl implements HFile.Writer {
metaData.add(i, content); metaData.add(i, content);
} }
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
*
* @param cell Cell to add. Cannot be empty nor null.
* @throws IOException
*/
@Override
public void append(final Cell cell) throws IOException {
byte[] value = cell.getValueArray();
int voffset = cell.getValueOffset();
int vlength = cell.getValueLength();
// checkKey uses comparator to check we are writing in order.
boolean dupKey = checkKey(cell);
checkValue(value, voffset, vlength);
if (!dupKey) {
checkBlockBoundary();
}
if (!fsBlockWriter.isWriting()) {
newBlock();
}
fsBlockWriter.write(cell);
totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
totalValueLength += vlength;
// Are we the first key in this block?
if (firstCellInBlock == null) {
// If cell is big, block will be closed and this firstCellInBlock reference will only last
// a short while.
firstCellInBlock = cell;
}
// TODO: What if cell is 10MB and we write infrequently? We'll hold on to the cell here
// indefinetly?
lastCell = cell;
entryCount++;
this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (outputStream == null) { if (outputStream == null) {
@ -522,120 +409,16 @@ public class HFileWriterImpl implements HFile.Writer {
}); });
} }
protected int getMajorVersion() {
return 2;
}
protected int getMinorVersion() {
return HFileReaderV2.MAX_MINOR_VERSION;
}
@Override @Override
public HFileContext getFileContext() { public HFileContext getFileContext() {
return hFileContext; return hFileContext;
} }
}
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
*
* @param cell
* Cell to add. Cannot be empty nor null.
* @throws IOException
*/
@Override
public void append(final Cell cell) throws IOException {
byte[] value = cell.getValueArray();
int voffset = cell.getValueOffset();
int vlength = cell.getValueLength();
// checkKey uses comparator to check we are writing in order.
boolean dupKey = checkKey(cell);
checkValue(value, voffset, vlength);
if (!dupKey) {
checkBlockBoundary();
}
if (!fsBlockWriter.isWriting()) {
newBlock();
}
fsBlockWriter.write(cell);
totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
totalValueLength += vlength;
// Are we the first key in this block?
if (firstCellInBlock == null) {
// If cell is big, block will be closed and this firstCellInBlock reference will only last
// a short while.
firstCellInBlock = cell;
}
// TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinetly?
lastCell = cell;
entryCount++;
this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
int tagsLength = cell.getTagsLength();
if (tagsLength > this.maxTagsLength) {
this.maxTagsLength = tagsLength;
}
}
protected void finishFileInfo() throws IOException {
if (lastCell != null) {
// Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
// byte buffer. Won't take a tuple.
byte [] lastKey = CellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
fileInfo.append(FileInfo.LASTKEY, lastKey, false);
}
// Average key length.
int avgKeyLen =
entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
false);
// Average value length.
int avgValueLen =
entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) {
// In case of Prefix Tree encoding, we always write tags information into HFiles even if all
// KVs are having no tags.
fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
} else if (hFileContext.isIncludesTags()) {
// When tags are not being written in this file, MAX_TAGS_LEN is excluded
// from the FileInfo
fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
&& hFileContext.isCompressTags();
fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
}
}
protected int getMajorVersion() {
return 3;
}
protected int getMinorVersion() {
return HFileReaderImpl.MAX_MINOR_VERSION;
}
protected void finishClose(FixedFileTrailer trailer) throws IOException {
// Write out encryption metadata before finalizing if we have a valid crypto context
Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
if (cryptoContext != Encryption.Context.NONE) {
// Wrap the context's key and write it as the encryption metadata, the wrapper includes
// all information needed for decryption
trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(),
cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
User.getCurrent().getShortName()),
cryptoContext.getKey()));
}
// Now we can finish the close
trailer.setMetaIndexCount(metaNames.size());
trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
trailer.setEntryCount(entryCount);
trailer.setCompressionCodec(hFileContext.getCompression());
trailer.serialize(outputStream);
if (closeOutputStream) {
outputStream.close();
outputStream = null;
}
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
/**
* {@link HFile} writer for version 3.
*/
@InterfaceAudience.Private
public class HFileWriterV3 extends HFileWriterV2 {
private static final Log LOG = LogFactory.getLog(HFileWriterV3.class);
private int maxTagsLength = 0;
static class WriterFactoryV3 extends HFile.WriterFactory {
WriterFactoryV3(Configuration conf, CacheConfig cacheConf) {
super(conf, cacheConf);
}
@Override
public Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream,
final KVComparator comparator, HFileContext fileContext)
throws IOException {
return new HFileWriterV3(conf, cacheConf, fs, path, ostream, comparator, fileContext);
}
}
/** Constructor that takes a path, creates and closes the output stream. */
public HFileWriterV3(Configuration conf, CacheConfig cacheConf, FileSystem fs, Path path,
FSDataOutputStream ostream, final KVComparator comparator,
final HFileContext fileContext) throws IOException {
super(conf, cacheConf, fs, path, ostream, comparator, fileContext);
if (LOG.isTraceEnabled()) {
LOG.trace("Writer" + (path != null ? " for " + path : "") +
" initialized with cacheConf: " + cacheConf +
" comparator: " + comparator.getClass().getSimpleName() +
" fileContext: " + fileContext);
}
}
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
*
* @param cell
* Cell to add. Cannot be empty nor null.
* @throws IOException
*/
@Override
public void append(final Cell cell) throws IOException {
// Currently get the complete arrays
super.append(cell);
int tagsLength = cell.getTagsLength();
if (tagsLength > this.maxTagsLength) {
this.maxTagsLength = tagsLength;
}
}
protected void finishFileInfo() throws IOException {
super.finishFileInfo();
if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) {
// In case of Prefix Tree encoding, we always write tags information into HFiles even if all
// KVs are having no tags.
fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
} else if (hFileContext.isIncludesTags()) {
// When tags are not being written in this file, MAX_TAGS_LEN is excluded
// from the FileInfo
fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
&& hFileContext.isCompressTags();
fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
}
}
@Override
protected int getMajorVersion() {
return 3;
}
@Override
protected int getMinorVersion() {
return HFileReaderV3.MAX_MINOR_VERSION;
}
@Override
protected void finishClose(FixedFileTrailer trailer) throws IOException {
// Write out encryption metadata before finalizing if we have a valid crypto context
Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
if (cryptoContext != Encryption.Context.NONE) {
// Wrap the context's key and write it as the encryption metadata, the wrapper includes
// all information needed for decryption
trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(),
cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
User.getCurrent().getShortName()),
cryptoContext.getKey()));
}
// Now we can finish the close
super.finishClose(trailer);
}
}

View File

@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
@ -129,7 +129,7 @@ public class HFileOutputFormat2
// Invented config. Add to hbase-*.xml if other than default compression. // Invented config. Add to hbase-*.xml if other than default compression.
final String defaultCompressionStr = conf.get("hfile.compression", final String defaultCompressionStr = conf.get("hfile.compression",
Compression.Algorithm.NONE.getName()); Compression.Algorithm.NONE.getName());
final Algorithm defaultCompression = HFileWriterImpl final Algorithm defaultCompression = AbstractHFileWriter
.compressionByName(defaultCompressionStr); .compressionByName(defaultCompressionStr);
final boolean compactionExclude = conf.getBoolean( final boolean compactionExclude = conf.getBoolean(
"hbase.mapreduce.hfileoutputformat.compaction.exclude", false); "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
@ -483,7 +483,7 @@ public class HFileOutputFormat2
Map<byte[], Algorithm> compressionMap = new TreeMap<byte[], Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
Algorithm>(Bytes.BYTES_COMPARATOR); Algorithm>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], String> e : stringMap.entrySet()) { for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); Algorithm algorithm = AbstractHFileWriter.compressionByName(e.getValue());
compressionMap.put(e.getKey(), algorithm); compressionMap.put(e.getKey(), algorithm);
} }
return compressionMap; return compressionMap;

View File

@ -93,7 +93,6 @@ import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@ -494,7 +493,6 @@ public class HRegionServer extends HasThread implements
throws IOException { throws IOException {
this.fsOk = true; this.fsOk = true;
this.conf = conf; this.conf = conf;
HFile.checkHFileVersion(this.conf);
checkCodecs(this.conf); checkCodecs(this.conf);
this.userProvider = UserProvider.instantiate(conf); this.userProvider = UserProvider.instantiate(conf);
FSUtils.setupShortCircuitRead(this.conf); FSUtils.setupShortCircuitRead(this.conf);

View File

@ -136,7 +136,7 @@ public interface Region extends ConfigurationObserver {
*/ */
long getOldestHfileTs(boolean majorCompactioOnly) throws IOException; long getOldestHfileTs(boolean majorCompactioOnly) throws IOException;
/** /**
* @return map of column family names to max sequence id that was read from storage when this * @return map of column family names to max sequence id that was read from storage when this
* region was opened * region was opened
*/ */
@ -157,7 +157,7 @@ public interface Region extends ConfigurationObserver {
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
// Metrics // Metrics
/** @return read requests count for this region */ /** @return read requests count for this region */
long getReadRequestsCount(); long getReadRequestsCount();
@ -181,7 +181,7 @@ public interface Region extends ConfigurationObserver {
/** @return the number of mutations processed bypassing the WAL */ /** @return the number of mutations processed bypassing the WAL */
long getNumMutationsWithoutWAL(); long getNumMutationsWithoutWAL();
/** @return the size of data processed bypassing the WAL, in bytes */ /** @return the size of data processed bypassing the WAL, in bytes */
long getDataInMemoryWithoutWAL(); long getDataInMemoryWithoutWAL();
@ -216,7 +216,7 @@ public interface Region extends ConfigurationObserver {
/** /**
* This method needs to be called before any public call that reads or * This method needs to be called before any public call that reads or
* modifies data. * modifies data.
* Acquires a read lock and checks if the region is closing or closed. * Acquires a read lock and checks if the region is closing or closed.
* <p>{@link #closeRegionOperation} MUST then always be called after * <p>{@link #closeRegionOperation} MUST then always be called after
* the operation has completed, whether it succeeded or failed. * the operation has completed, whether it succeeded or failed.
@ -226,7 +226,7 @@ public interface Region extends ConfigurationObserver {
/** /**
* This method needs to be called before any public call that reads or * This method needs to be called before any public call that reads or
* modifies data. * modifies data.
* Acquires a read lock and checks if the region is closing or closed. * Acquires a read lock and checks if the region is closing or closed.
* <p>{@link #closeRegionOperation} MUST then always be called after * <p>{@link #closeRegionOperation} MUST then always be called after
* the operation has completed, whether it succeeded or failed. * the operation has completed, whether it succeeded or failed.
@ -413,7 +413,7 @@ public interface Region extends ConfigurationObserver {
/** /**
* Perform atomic mutations within the region. * Perform atomic mutations within the region.
* *
* @param mutations The list of mutations to perform. * @param mutations The list of mutations to perform.
* <code>mutations</code> can contain operations for multiple rows. * <code>mutations</code> can contain operations for multiple rows.
* Caller has to ensure that all rows are contained in this region. * Caller has to ensure that all rows are contained in this region.
@ -588,7 +588,7 @@ public interface Region extends ConfigurationObserver {
byte[] now) throws IOException; byte[] now) throws IOException;
/** /**
* Replace any cell timestamps set to {@link org.apache.hadoop.hbase.HConstants#LATEST_TIMESTAMP} * Replace any cell timestamps set to HConstants#LATEST_TIMESTAMP with the
* provided current timestamp. * provided current timestamp.
* @param values * @param values
* @param now * @param now
@ -609,13 +609,13 @@ public interface Region extends ConfigurationObserver {
CANNOT_FLUSH_MEMSTORE_EMPTY, CANNOT_FLUSH_MEMSTORE_EMPTY,
CANNOT_FLUSH CANNOT_FLUSH
} }
/** @return the detailed result code */ /** @return the detailed result code */
Result getResult(); Result getResult();
/** @return true if the memstores were flushed, else false */ /** @return true if the memstores were flushed, else false */
boolean isFlushSucceeded(); boolean isFlushSucceeded();
/** @return True if the flush requested a compaction, else false */ /** @return True if the flush requested a compaction, else false */
boolean isCompactionNeeded(); boolean isCompactionNeeded();
} }
@ -647,7 +647,7 @@ public interface Region extends ConfigurationObserver {
* Synchronously compact all stores in the region. * Synchronously compact all stores in the region.
* <p>This operation could block for a long time, so don't call it from a * <p>This operation could block for a long time, so don't call it from a
* time-sensitive thread. * time-sensitive thread.
* <p>Note that no locks are taken to prevent possible conflicts between * <p>Note that no locks are taken to prevent possible conflicts between
* compaction and splitting activities. The regionserver does not normally compact * compaction and splitting activities. The regionserver does not normally compact
* and split in parallel. However by calling this method you may introduce * and split in parallel. However by calling this method you may introduce
* unexpected and unhandled concurrency. Don't do this unless you know what * unexpected and unhandled concurrency. Don't do this unless you know what

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterFactory;
@ -408,7 +409,7 @@ public class StoreFile {
} }
this.reader.setSequenceID(this.sequenceid); this.reader.setSequenceID(this.sequenceid);
b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
if (b != null) { if (b != null) {
this.maxMemstoreTS = Bytes.toLong(b); this.maxMemstoreTS = Bytes.toLong(b);
} }

View File

@ -34,8 +34,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
@ -142,7 +142,7 @@ public abstract class Compactor {
fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
} }
else { else {
tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
if (tmp != null) { if (tmp != null) {
fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
} }

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
@ -120,7 +120,7 @@ public class CompressionTest {
throws Exception { throws Exception {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
HFileContext context = new HFileContextBuilder() HFileContext context = new HFileContextBuilder()
.withCompression(HFileWriterImpl.compressionByName(codec)).build(); .withCompression(AbstractHFileWriter.compressionByName(codec)).build();
HFile.Writer writer = HFile.getWriterFactoryNoCache(conf) HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
.withPath(fs, path) .withPath(fs, path)
.withFileContext(context) .withFileContext(context)

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.io.crypto.aes.AES; import org.apache.hadoop.hbase.io.crypto.aes.AES;
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
@ -326,7 +326,7 @@ public class HFilePerformanceEvaluation {
void setUp() throws Exception { void setUp() throws Exception {
HFileContextBuilder builder = new HFileContextBuilder() HFileContextBuilder builder = new HFileContextBuilder()
.withCompression(HFileWriterImpl.compressionByName(codec)) .withCompression(AbstractHFileWriter.compressionByName(codec))
.withBlockSize(RFILE_BLOCKSIZE); .withBlockSize(RFILE_BLOCKSIZE);
if (cipher == "aes") { if (cipher == "aes") {

View File

@ -242,6 +242,7 @@ public class TestCacheOnWrite {
public void setUp() throws IOException { public void setUp() throws IOException {
conf = TEST_UTIL.getConfiguration(); conf = TEST_UTIL.getConfiguration();
this.conf.set("dfs.datanode.data.dir.perm", "700"); this.conf.set("dfs.datanode.data.dir.perm", "700");
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
BLOOM_BLOCK_SIZE); BLOOM_BLOCK_SIZE);
@ -271,7 +272,12 @@ public class TestCacheOnWrite {
} }
private void readStoreFile(boolean useTags) throws IOException { private void readStoreFile(boolean useTags) throws IOException {
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, conf); AbstractHFileReader reader;
if (useTags) {
reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf);
} else {
reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf);
}
LOG.info("HFile information: " + reader); LOG.info("HFile information: " + reader);
HFileContext meta = new HFileContextBuilder().withCompression(compress) HFileContext meta = new HFileContextBuilder().withCompression(compress)
.withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
@ -372,6 +378,11 @@ public class TestCacheOnWrite {
} }
private void writeStoreFile(boolean useTags) throws IOException { private void writeStoreFile(boolean useTags) throws IOException {
if(useTags) {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
} else {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
}
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
"test_cache_on_write"); "test_cache_on_write");
HFileContext meta = new HFileContextBuilder().withCompression(compress) HFileContext meta = new HFileContextBuilder().withCompression(compress)
@ -411,6 +422,11 @@ public class TestCacheOnWrite {
private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (useTags) {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
} else {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
}
// TODO: need to change this test if we add a cache size threshold for // TODO: need to change this test if we add a cache size threshold for
// compactions, or if we implement some other kind of intelligent logic for // compactions, or if we implement some other kind of intelligent logic for
// deciding what blocks to cache-on-write on compaction. // deciding what blocks to cache-on-write on compaction.

View File

@ -90,7 +90,7 @@ public class TestFixedFileTrailer {
@Test @Test
public void testTrailer() throws IOException { public void testTrailer() throws IOException {
FixedFileTrailer t = new FixedFileTrailer(version, FixedFileTrailer t = new FixedFileTrailer(version,
HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION); HFileReaderV2.PBUF_TRAILER_MINOR_VERSION);
t.setDataIndexCount(3); t.setDataIndexCount(3);
t.setEntryCount(((long) Integer.MAX_VALUE) + 1); t.setEntryCount(((long) Integer.MAX_VALUE) + 1);
@ -123,7 +123,7 @@ public class TestFixedFileTrailer {
{ {
DataInputStream dis = new DataInputStream(bais); DataInputStream dis = new DataInputStream(bais);
FixedFileTrailer t2 = new FixedFileTrailer(version, FixedFileTrailer t2 = new FixedFileTrailer(version,
HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION); HFileReaderV2.PBUF_TRAILER_MINOR_VERSION);
t2.deserialize(dis); t2.deserialize(dis);
assertEquals(-1, bais.read()); // Ensure we have read everything. assertEquals(-1, bais.read()); // Ensure we have read everything.
checkLoadedTrailer(version, t, t2); checkLoadedTrailer(version, t, t2);
@ -172,7 +172,7 @@ public class TestFixedFileTrailer {
public void testTrailerForV2NonPBCompatibility() throws Exception { public void testTrailerForV2NonPBCompatibility() throws Exception {
if (version == 2) { if (version == 2) {
FixedFileTrailer t = new FixedFileTrailer(version, FixedFileTrailer t = new FixedFileTrailer(version,
HFileReaderImpl.MINOR_VERSION_NO_CHECKSUM); HFileReaderV2.MINOR_VERSION_NO_CHECKSUM);
t.setDataIndexCount(3); t.setDataIndexCount(3);
t.setEntryCount(((long) Integer.MAX_VALUE) + 1); t.setEntryCount(((long) Integer.MAX_VALUE) + 1);
t.setLastDataBlockOffset(291); t.setLastDataBlockOffset(291);
@ -199,7 +199,7 @@ public class TestFixedFileTrailer {
{ {
DataInputStream dis = new DataInputStream(bais); DataInputStream dis = new DataInputStream(bais);
FixedFileTrailer t2 = new FixedFileTrailer(version, FixedFileTrailer t2 = new FixedFileTrailer(version,
HFileReaderImpl.MINOR_VERSION_NO_CHECKSUM); HFileReaderV2.MINOR_VERSION_NO_CHECKSUM);
t2.deserialize(dis); t2.deserialize(dis);
assertEquals(-1, bais.read()); // Ensure we have read everything. assertEquals(-1, bais.read()); // Ensure we have read everything.
checkLoadedTrailer(version, t, t2); checkLoadedTrailer(version, t, t2);

View File

@ -82,6 +82,8 @@ public class TestForceCacheImportantBlocks {
public static Collection<Object[]> parameters() { public static Collection<Object[]> parameters() {
// HFile versions // HFile versions
return Arrays.asList( return Arrays.asList(
new Object[] { 2, true },
new Object[] { 2, false },
new Object[] { 3, true }, new Object[] { 3, true },
new Object[] { 3, false } new Object[] { 3, false }
); );

View File

@ -246,7 +246,7 @@ public class TestHFile extends HBaseTestCase {
FSDataOutputStream fout = createFSOutput(ncTFile); FSDataOutputStream fout = createFSOutput(ncTFile);
HFileContext meta = new HFileContextBuilder() HFileContext meta = new HFileContextBuilder()
.withBlockSize(minBlockSize) .withBlockSize(minBlockSize)
.withCompression(HFileWriterImpl.compressionByName(codec)) .withCompression(AbstractHFileWriter.compressionByName(codec))
.build(); .build();
Writer writer = HFile.getWriterFactory(conf, cacheConf) Writer writer = HFile.getWriterFactory(conf, cacheConf)
.withOutputStream(fout) .withOutputStream(fout)
@ -339,7 +339,7 @@ public class TestHFile extends HBaseTestCase {
Path mFile = new Path(ROOT_DIR, "meta.hfile"); Path mFile = new Path(ROOT_DIR, "meta.hfile");
FSDataOutputStream fout = createFSOutput(mFile); FSDataOutputStream fout = createFSOutput(mFile);
HFileContext meta = new HFileContextBuilder() HFileContext meta = new HFileContextBuilder()
.withCompression(HFileWriterImpl.compressionByName(compress)) .withCompression(AbstractHFileWriter.compressionByName(compress))
.withBlockSize(minBlockSize).build(); .withBlockSize(minBlockSize).build();
Writer writer = HFile.getWriterFactory(conf, cacheConf) Writer writer = HFile.getWriterFactory(conf, cacheConf)
.withOutputStream(fout) .withOutputStream(fout)

View File

@ -590,7 +590,7 @@ public class TestHFileBlockIndex {
} }
// Manually compute the mid-key and validate it. // Manually compute the mid-key and validate it.
HFile.Reader reader2 = reader; HFileReaderV2 reader2 = (HFileReaderV2) reader;
HFileBlock.FSReader fsReader = reader2.getUncachedBlockReader(); HFileBlock.FSReader fsReader = reader2.getUncachedBlockReader();
HFileBlock.BlockIterator iter = fsReader.blockRange(0, HFileBlock.BlockIterator iter = fsReader.blockRange(0,

View File

@ -55,7 +55,8 @@ public class TestHFileInlineToRootChunkConversion {
CacheConfig cacheConf = new CacheConfig(conf); CacheConfig cacheConf = new CacheConfig(conf);
conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, maxChunkSize); conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, maxChunkSize);
HFileContext context = new HFileContextBuilder().withBlockSize(16).build(); HFileContext context = new HFileContextBuilder().withBlockSize(16).build();
HFile.Writer hfw = new HFileWriterFactory(conf, cacheConf) HFileWriterV2 hfw =
(HFileWriterV2) new HFileWriterV2.WriterFactoryV2(conf, cacheConf)
.withFileContext(context) .withFileContext(context)
.withPath(fs, hfPath).create(); .withPath(fs, hfPath).create();
List<byte[]> keys = new ArrayList<byte[]>(); List<byte[]> keys = new ArrayList<byte[]>();
@ -77,7 +78,7 @@ public class TestHFileInlineToRootChunkConversion {
} }
hfw.close(); hfw.close();
HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, conf); HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs, hfPath, cacheConf, conf);
// Scanner doesn't do Cells yet. Fix. // Scanner doesn't do Cells yet. Fix.
HFileScanner scanner = reader.getScanner(true, true); HFileScanner scanner = reader.getScanner(true, true);
for (int i = 0; i < keys.size(); ++i) { for (int i = 0; i < keys.size(); ++i) {
@ -85,4 +86,4 @@ public class TestHFileInlineToRootChunkConversion {
} }
reader.close(); reader.close();
} }
} }

View File

@ -130,7 +130,7 @@ public class TestHFileSeek extends TestCase {
try { try {
HFileContext context = new HFileContextBuilder() HFileContext context = new HFileContextBuilder()
.withBlockSize(options.minBlockSize) .withBlockSize(options.minBlockSize)
.withCompression(HFileWriterImpl.compressionByName(options.compress)) .withCompression(AbstractHFileWriter.compressionByName(options.compress))
.build(); .build();
Writer writer = HFile.getWriterFactoryNoCache(conf) Writer writer = HFile.getWriterFactoryNoCache(conf)
.withOutputStream(fout) .withOutputStream(fout)

View File

@ -56,7 +56,7 @@ import org.junit.experimental.categories.Category;
/** /**
* Testing writing a version 2 {@link HFile}. This is a low-level test written * Testing writing a version 2 {@link HFile}. This is a low-level test written
* during the development of {@link HFileWriterImpl}. * during the development of {@link HFileWriterV2}.
*/ */
@Category({IOTests.class, SmallTests.class}) @Category({IOTests.class, SmallTests.class})
public class TestHFileWriterV2 { public class TestHFileWriterV2 {
@ -99,7 +99,8 @@ public class TestHFileWriterV2 {
.withBlockSize(4096) .withBlockSize(4096)
.withCompression(compressAlgo) .withCompression(compressAlgo)
.build(); .build();
HFile.Writer writer = new HFileWriterFactory(conf, new CacheConfig(conf)) HFileWriterV2 writer = (HFileWriterV2)
new HFileWriterV2.WriterFactoryV2(conf, new CacheConfig(conf))
.withPath(fs, hfilePath) .withPath(fs, hfilePath)
.withFileContext(context) .withFileContext(context)
.create(); .create();
@ -135,6 +136,7 @@ public class TestHFileWriterV2 {
FixedFileTrailer trailer = FixedFileTrailer trailer =
FixedFileTrailer.readFromStream(fsdis, fileSize); FixedFileTrailer.readFromStream(fsdis, fileSize);
assertEquals(2, trailer.getMajorVersion());
assertEquals(entryCount, trailer.getEntryCount()); assertEquals(entryCount, trailer.getEntryCount());
HFileContext meta = new HFileContextBuilder() HFileContext meta = new HFileContextBuilder()
@ -175,7 +177,8 @@ public class TestHFileWriterV2 {
// File info // File info
FileInfo fileInfo = new FileInfo(); FileInfo fileInfo = new FileInfo();
fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION); byte [] keyValueFormatVersion = fileInfo.get(
HFileWriterV2.KEY_VALUE_VERSION);
boolean includeMemstoreTS = keyValueFormatVersion != null && boolean includeMemstoreTS = keyValueFormatVersion != null &&
Bytes.toInt(keyValueFormatVersion) > 0; Bytes.toInt(keyValueFormatVersion) > 0;

View File

@ -60,7 +60,8 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
/** /**
* Testing writing a version 3 {@link HFile}. * Testing writing a version 3 {@link HFile}. This is a low-level test written
* during the development of {@link HFileWriterV3}.
*/ */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@Category({IOTests.class, SmallTests.class}) @Category({IOTests.class, SmallTests.class})
@ -119,7 +120,8 @@ public class TestHFileWriterV3 {
.withBlockSize(4096) .withBlockSize(4096)
.withIncludesTags(useTags) .withIncludesTags(useTags)
.withCompression(compressAlgo).build(); .withCompression(compressAlgo).build();
HFile.Writer writer = new HFileWriterFactory(conf, new CacheConfig(conf)) HFileWriterV3 writer = (HFileWriterV3)
new HFileWriterV3.WriterFactoryV3(conf, new CacheConfig(conf))
.withPath(fs, hfilePath) .withPath(fs, hfilePath)
.withFileContext(context) .withFileContext(context)
.withComparator(KeyValue.COMPARATOR) .withComparator(KeyValue.COMPARATOR)
@ -204,7 +206,8 @@ public class TestHFileWriterV3 {
// File info // File info
FileInfo fileInfo = new FileInfo(); FileInfo fileInfo = new FileInfo();
fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION); byte [] keyValueFormatVersion = fileInfo.get(
HFileWriterV3.KEY_VALUE_VERSION);
boolean includeMemstoreTS = keyValueFormatVersion != null && boolean includeMemstoreTS = keyValueFormatVersion != null &&
Bytes.toInt(keyValueFormatVersion) > 0; Bytes.toInt(keyValueFormatVersion) > 0;

View File

@ -89,7 +89,8 @@ public class TestLazyDataBlockDecompression {
*/ */
private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path, private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path,
HFileContext cxt, int entryCount) throws IOException { HFileContext cxt, int entryCount) throws IOException {
HFile.Writer writer = new HFileWriterFactory(conf, cc) HFileWriterV2 writer = (HFileWriterV2)
new HFileWriterV2.WriterFactoryV2(conf, cc)
.withPath(fs, path) .withPath(fs, path)
.withFileContext(cxt) .withFileContext(cxt)
.create(); .create();
@ -117,7 +118,7 @@ public class TestLazyDataBlockDecompression {
long fileSize = fs.getFileStatus(path).getLen(); long fileSize = fs.getFileStatus(path).getLen();
FixedFileTrailer trailer = FixedFileTrailer trailer =
FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize); FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
HFile.Reader reader = new HFileReaderImpl(path, trailer, fsdis, fileSize, cacheConfig, HFileReaderV2 reader = new HFileReaderV2(path, trailer, fsdis, fileSize, cacheConfig,
fsdis.getHfs(), conf); fsdis.getHfs(), conf);
reader.loadFileInfo(); reader.loadFileInfo();
long offset = trailer.getFirstDataBlockOffset(), long offset = trailer.getFirstDataBlockOffset(),

View File

@ -55,6 +55,7 @@ public class TestPrefetch {
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
conf = TEST_UTIL.getConfiguration(); conf = TEST_UTIL.getConfiguration();
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
fs = HFileSystem.get(conf); fs = HFileSystem.get(conf);
CacheConfig.blockCacheDisabled = false; CacheConfig.blockCacheDisabled = false;
@ -69,9 +70,10 @@ public class TestPrefetch {
private void readStoreFile(Path storeFilePath) throws Exception { private void readStoreFile(Path storeFilePath) throws Exception {
// Open the file // Open the file
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, conf); HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs,
storeFilePath, cacheConf, conf);
while (!reader.prefetchComplete()) { while (!((HFileReaderV3)reader).prefetchComplete()) {
// Sleep for a bit // Sleep for a bit
Thread.sleep(1000); Thread.sleep(1000);
} }

View File

@ -37,7 +37,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
/** /**
* Test {@link HFileScanner#reseekTo(org.apache.hadoop.hbase.Cell)} * Test {@link HFileScanner#reseekTo(byte[])}
*/ */
@Category({IOTests.class, SmallTests.class}) @Category({IOTests.class, SmallTests.class})
public class TestReseekTo { public class TestReseekTo {

View File

@ -73,6 +73,11 @@ public class TestSeekTo extends HBaseTestCase {
Path makeNewFile(TagUsage tagUsage) throws IOException { Path makeNewFile(TagUsage tagUsage) throws IOException {
Path ncTFile = new Path(testDir, "basic.hfile"); Path ncTFile = new Path(testDir, "basic.hfile");
if (tagUsage != TagUsage.NO_TAG) {
conf.setInt("hfile.format.version", 3);
} else {
conf.setInt("hfile.format.version", 2);
}
FSDataOutputStream fout = this.fs.create(ncTFile); FSDataOutputStream fout = this.fs.create(ncTFile);
int blocksize = toKV("a", tagUsage).getLength() * 3; int blocksize = toKV("a", tagUsage).getLength() * 3;
HFileContext context = new HFileContextBuilder().withBlockSize(blocksize) HFileContext context = new HFileContextBuilder().withBlockSize(blocksize)
@ -137,7 +142,7 @@ public class TestSeekTo extends HBaseTestCase {
@Test @Test
public void testSeekBeforeWithReSeekTo() throws Exception { public void testSeekBeforeWithReSeekTo() throws Exception {
testSeekBeforeInternals(TagUsage.NO_TAG); testSeekBeforeWithReSeekToInternals(TagUsage.NO_TAG);
testSeekBeforeWithReSeekToInternals(TagUsage.ONLY_TAG); testSeekBeforeWithReSeekToInternals(TagUsage.ONLY_TAG);
testSeekBeforeWithReSeekToInternals(TagUsage.PARTIAL_TAG); testSeekBeforeWithReSeekToInternals(TagUsage.PARTIAL_TAG);
} }
@ -227,7 +232,7 @@ public class TestSeekTo extends HBaseTestCase {
@Test @Test
public void testSeekTo() throws Exception { public void testSeekTo() throws Exception {
testSeekBeforeInternals(TagUsage.NO_TAG); testSeekToInternals(TagUsage.NO_TAG);
testSeekToInternals(TagUsage.ONLY_TAG); testSeekToInternals(TagUsage.ONLY_TAG);
testSeekToInternals(TagUsage.PARTIAL_TAG); testSeekToInternals(TagUsage.PARTIAL_TAG);
} }
@ -257,7 +262,7 @@ public class TestSeekTo extends HBaseTestCase {
@Test @Test
public void testBlockContainingKey() throws Exception { public void testBlockContainingKey() throws Exception {
testSeekBeforeInternals(TagUsage.NO_TAG); testBlockContainingKeyInternals(TagUsage.NO_TAG);
testBlockContainingKeyInternals(TagUsage.ONLY_TAG); testBlockContainingKeyInternals(TagUsage.ONLY_TAG);
testBlockContainingKeyInternals(TagUsage.PARTIAL_TAG); testBlockContainingKeyInternals(TagUsage.PARTIAL_TAG);
} }

View File

@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl; import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Compressor;
@ -602,9 +602,8 @@ public class DataBlockEncodingTool {
// run the utilities // run the utilities
DataBlockEncodingTool comp = new DataBlockEncodingTool(compressionName); DataBlockEncodingTool comp = new DataBlockEncodingTool(compressionName);
int majorVersion = reader.getHFileVersion(); int majorVersion = reader.getHFileVersion();
comp.useHBaseChecksum = majorVersion > 2 || comp.useHBaseChecksum = majorVersion > 2
(majorVersion == 2 && || (majorVersion == 2 && reader.getHFileMinorVersion() >= HFileReaderV2.MINOR_VERSION_WITH_CHECKSUM);
reader.getHFileMinorVersion() >= HFileReaderImpl.MINOR_VERSION_WITH_CHECKSUM);
comp.checkStatistics(scanner, kvLimit); comp.checkStatistics(scanner, kvLimit);
if (doVerify) { if (doVerify) {
comp.verifyCodecs(scanner, kvLimit); comp.verifyCodecs(scanner, kvLimit);

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2; import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2;
import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.DefaultWALProvider;
@ -220,7 +221,7 @@ public class TestCacheOnWriteInSchema {
BlockCache cache = cacheConf.getBlockCache(); BlockCache cache = cacheConf.getBlockCache();
StoreFile sf = new StoreFile(fs, path, conf, cacheConf, StoreFile sf = new StoreFile(fs, path, conf, cacheConf,
BloomType.ROWCOL); BloomType.ROWCOL);
HFile.Reader reader = sf.createReader().getHFileReader(); HFileReaderV2 reader = (HFileReaderV2) sf.createReader().getHFileReader();
try { try {
// Open a scanner with (on read) caching disabled // Open a scanner with (on read) caching disabled
HFileScanner scanner = reader.getScanner(false, false); HFileScanner scanner = reader.getScanner(false, false);