diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
index cd1b0b647f3..ff8ed194d58 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
@@ -24,8 +24,8 @@ import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl;
-import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.io.hfile.HFileReaderV3;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterV3;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
@@ -46,8 +46,8 @@ public class IntegrationTestIngestWithEncryption extends IntegrationTestIngest {
static {
// These log level changes are only useful when running on a localhost
// cluster.
- Logger.getLogger(HFileReaderImpl.class).setLevel(Level.TRACE);
- Logger.getLogger(HFileWriterImpl.class).setLevel(Level.TRACE);
+ Logger.getLogger(HFileReaderV3.class).setLevel(Level.TRACE);
+ Logger.getLogger(HFileWriterV3.class).setLevel(Level.TRACE);
Logger.getLogger(SecureProtobufLogReader.class).setLevel(Level.TRACE);
Logger.getLogger(SecureProtobufLogWriter.class).setLevel(Level.TRACE);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
new file mode 100644
index 00000000000..8c1e7b95651
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
@@ -0,0 +1,352 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+
+/**
+ * Common functionality needed by all versions of {@link HFile} readers.
+ */
+@InterfaceAudience.Private
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
+public abstract class AbstractHFileReader
+ implements HFile.Reader, Configurable {
+ /** Stream to read from. Does checksum verifications in file system */
+ protected FSDataInputStream istream; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD
+
+ /** The file system stream of the underlying {@link HFile} that
+ * does not do checksum verification in the file system */
+ protected FSDataInputStream istreamNoFsChecksum; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD
+
+ /** Data block index reader keeping the root data index in memory */
+ protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
+
+ /** Meta block index reader -- always single level */
+ protected HFileBlockIndex.BlockIndexReader metaBlockIndexReader;
+
+ protected final FixedFileTrailer trailer;
+
+ /** Filled when we read in the trailer. */
+ protected final Compression.Algorithm compressAlgo;
+
+ /**
+ * What kind of data block encoding should be used while reading, writing,
+ * and handling cache.
+ */
+ protected HFileDataBlockEncoder dataBlockEncoder =
+ NoOpDataBlockEncoder.INSTANCE;
+
+ /** Last key in the file. Filled in when we read in the file info */
+ protected byte [] lastKey = null;
+
+ /** Average key length read from file info */
+ protected int avgKeyLen = -1;
+
+ /** Average value length read from file info */
+ protected int avgValueLen = -1;
+
+ /** Key comparator */
+ protected KVComparator comparator = new KVComparator();
+
+ /** Size of this file. */
+ protected final long fileSize;
+
+ /** Block cache configuration. */
+ protected final CacheConfig cacheConf;
+
+ /** Path of file */
+ protected final Path path;
+
+ /** File name to be used for block names */
+ protected final String name;
+
+ protected FileInfo fileInfo;
+
+ /** The filesystem used for accesing data */
+ protected HFileSystem hfs;
+
+ protected Configuration conf;
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
+ protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
+ final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs,
+ final Configuration conf) {
+ this.trailer = trailer;
+ this.compressAlgo = trailer.getCompressionCodec();
+ this.cacheConf = cacheConf;
+ this.fileSize = fileSize;
+ this.path = path;
+ this.name = path.getName();
+ this.hfs = hfs; // URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD
+ this.conf = conf;
+ }
+
+ @SuppressWarnings("serial")
+ public static class BlockIndexNotLoadedException
+ extends IllegalStateException {
+ public BlockIndexNotLoadedException() {
+ // Add a message in case anyone relies on it as opposed to class name.
+ super("Block index not loaded");
+ }
+ }
+
+ protected String toStringFirstKey() {
+ return KeyValue.keyToString(getFirstKey());
+ }
+
+ protected String toStringLastKey() {
+ return KeyValue.keyToString(getLastKey());
+ }
+
+ public abstract boolean isFileInfoLoaded();
+
+ @Override
+ public String toString() {
+ return "reader=" + path.toString() +
+ (!isFileInfoLoaded()? "":
+ ", compression=" + compressAlgo.getName() +
+ ", cacheConf=" + cacheConf +
+ ", firstKey=" + toStringFirstKey() +
+ ", lastKey=" + toStringLastKey()) +
+ ", avgKeyLen=" + avgKeyLen +
+ ", avgValueLen=" + avgValueLen +
+ ", entries=" + trailer.getEntryCount() +
+ ", length=" + fileSize;
+ }
+
+ @Override
+ public long length() {
+ return fileSize;
+ }
+
+ /**
+ * Create a Scanner on this file. No seeks or reads are done on creation. Call
+ * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
+ * nothing to clean up in a Scanner. Letting go of your references to the
+ * scanner is sufficient. NOTE: Do not use this overload of getScanner for
+ * compactions.
+ *
+ * @param cacheBlocks True if we should cache blocks read in by this scanner.
+ * @param pread Use positional read rather than seek+read if true (pread is
+ * better for random reads, seek+read is better scanning).
+ * @return Scanner on this file.
+ */
+ @Override
+ public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
+ return getScanner(cacheBlocks, pread, false);
+ }
+
+ /**
+ * @return the first key in the file. May be null if file has no entries. Note
+ * that this is not the first row key, but rather the byte form of the
+ * first KeyValue.
+ */
+ @Override
+ public byte [] getFirstKey() {
+ if (dataBlockIndexReader == null) {
+ throw new BlockIndexNotLoadedException();
+ }
+ return dataBlockIndexReader.isEmpty() ? null
+ : dataBlockIndexReader.getRootBlockKey(0);
+ }
+
+ /**
+ * TODO left from {@link HFile} version 1: move this to StoreFile after Ryan's
+ * patch goes in to eliminate {@link KeyValue} here.
+ *
+ * @return the first row key, or null if the file is empty.
+ */
+ @Override
+ public byte[] getFirstRowKey() {
+ byte[] firstKey = getFirstKey();
+ if (firstKey == null)
+ return null;
+ return KeyValue.createKeyValueFromKey(firstKey).getRow();
+ }
+
+ /**
+ * TODO left from {@link HFile} version 1: move this to StoreFile after
+ * Ryan's patch goes in to eliminate {@link KeyValue} here.
+ *
+ * @return the last row key, or null if the file is empty.
+ */
+ @Override
+ public byte[] getLastRowKey() {
+ byte[] lastKey = getLastKey();
+ if (lastKey == null)
+ return null;
+ return KeyValue.createKeyValueFromKey(lastKey).getRow();
+ }
+
+ /** @return number of KV entries in this HFile */
+ @Override
+ public long getEntries() {
+ return trailer.getEntryCount();
+ }
+
+ /** @return comparator */
+ @Override
+ public KVComparator getComparator() {
+ return comparator;
+ }
+
+ /** @return compression algorithm */
+ @Override
+ public Compression.Algorithm getCompressionAlgorithm() {
+ return compressAlgo;
+ }
+
+ /**
+ * @return the total heap size of data and meta block indexes in bytes. Does
+ * not take into account non-root blocks of a multilevel data index.
+ */
+ public long indexSize() {
+ return (dataBlockIndexReader != null ? dataBlockIndexReader.heapSize() : 0)
+ + ((metaBlockIndexReader != null) ? metaBlockIndexReader.heapSize()
+ : 0);
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public HFileBlockIndex.BlockIndexReader getDataBlockIndexReader() {
+ return dataBlockIndexReader;
+ }
+
+ @Override
+ public FixedFileTrailer getTrailer() {
+ return trailer;
+ }
+
+ @Override
+ public FileInfo loadFileInfo() throws IOException {
+ return fileInfo;
+ }
+
+ /**
+ * An exception thrown when an operation requiring a scanner to be seeked
+ * is invoked on a scanner that is not seeked.
+ */
+ @SuppressWarnings("serial")
+ public static class NotSeekedException extends IllegalStateException {
+ public NotSeekedException() {
+ super("Not seeked to a key/value");
+ }
+ }
+
+ protected static abstract class Scanner implements HFileScanner {
+ protected ByteBuffer blockBuffer;
+
+ protected boolean cacheBlocks;
+ protected final boolean pread;
+ protected final boolean isCompaction;
+
+ protected int currKeyLen;
+ protected int currValueLen;
+ protected int currMemstoreTSLen;
+ protected long currMemstoreTS;
+
+ protected int blockFetches;
+
+ protected final HFile.Reader reader;
+
+ public Scanner(final HFile.Reader reader, final boolean cacheBlocks,
+ final boolean pread, final boolean isCompaction) {
+ this.reader = reader;
+ this.cacheBlocks = cacheBlocks;
+ this.pread = pread;
+ this.isCompaction = isCompaction;
+ }
+
+ @Override
+ public boolean isSeeked(){
+ return blockBuffer != null;
+ }
+
+ @Override
+ public String toString() {
+ return "HFileScanner for reader " + String.valueOf(getReader());
+ }
+
+ protected void assertSeeked() {
+ if (!isSeeked())
+ throw new NotSeekedException();
+ }
+
+ @Override
+ public int seekTo(byte[] key) throws IOException {
+ return seekTo(key, 0, key.length);
+ }
+
+ @Override
+ public boolean seekBefore(byte[] key) throws IOException {
+ return seekBefore(key, 0, key.length);
+ }
+
+ @Override
+ public int reseekTo(byte[] key) throws IOException {
+ return reseekTo(key, 0, key.length);
+ }
+
+ @Override
+ public HFile.Reader getReader() {
+ return reader;
+ }
+ }
+
+ /** For testing */
+ abstract HFileBlock.FSReader getUncachedBlockReader();
+
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public DataBlockEncoding getDataBlockEncoding() {
+ return dataBlockEncoder.getDataBlockEncoding();
+ }
+
+ public abstract int getMajorVersion();
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
new file mode 100644
index 00000000000..52491e6b7cc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Common functionality needed by all versions of {@link HFile} writers.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractHFileWriter implements HFile.Writer {
+
+ /** The Cell previously appended. Becomes the last cell in the file.*/
+ protected Cell lastCell = null;
+
+ /** FileSystem stream to write into. */
+ protected FSDataOutputStream outputStream;
+
+ /** True if we opened the outputStream (and so will close it). */
+ protected final boolean closeOutputStream;
+
+ /** A "file info" block: a key-value map of file-wide metadata. */
+ protected FileInfo fileInfo = new HFile.FileInfo();
+
+ /** Total # of key/value entries, i.e. how many times add() was called. */
+ protected long entryCount = 0;
+
+ /** Used for calculating the average key length. */
+ protected long totalKeyLength = 0;
+
+ /** Used for calculating the average value length. */
+ protected long totalValueLength = 0;
+
+ /** Total uncompressed bytes, maybe calculate a compression ratio later. */
+ protected long totalUncompressedBytes = 0;
+
+ /** Key comparator. Used to ensure we write in order. */
+ protected final KVComparator comparator;
+
+ /** Meta block names. */
+ protected List metaNames = new ArrayList();
+
+ /** {@link Writable}s representing meta block data. */
+ protected List metaData = new ArrayList();
+
+ /**
+ * First cell in a block.
+ * This reference should be short-lived since we write hfiles in a burst.
+ */
+ protected Cell firstCellInBlock = null;
+
+ /** May be null if we were passed a stream. */
+ protected final Path path;
+
+
+ /** Cache configuration for caching data on write. */
+ protected final CacheConfig cacheConf;
+
+ /**
+ * Name for this object used when logging or in toString. Is either
+ * the result of a toString on stream or else name of passed file Path.
+ */
+ protected final String name;
+
+ /**
+ * The data block encoding which will be used.
+ * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
+ */
+ protected final HFileDataBlockEncoder blockEncoder;
+
+ protected final HFileContext hFileContext;
+
+ public AbstractHFileWriter(CacheConfig cacheConf,
+ FSDataOutputStream outputStream, Path path,
+ KVComparator comparator, HFileContext fileContext) {
+ this.outputStream = outputStream;
+ this.path = path;
+ this.name = path != null ? path.getName() : outputStream.toString();
+ this.hFileContext = fileContext;
+ DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
+ if (encoding != DataBlockEncoding.NONE) {
+ this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
+ } else {
+ this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
+ }
+ this.comparator = comparator != null ? comparator
+ : KeyValue.COMPARATOR;
+
+ closeOutputStream = path != null;
+ this.cacheConf = cacheConf;
+ }
+
+ /**
+ * Add last bits of metadata to file info before it is written out.
+ */
+ protected void finishFileInfo() throws IOException {
+ if (lastCell != null) {
+ // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
+ // byte buffer. Won't take a tuple.
+ byte [] lastKey = CellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
+ fileInfo.append(FileInfo.LASTKEY, lastKey, false);
+ }
+
+ // Average key length.
+ int avgKeyLen =
+ entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
+ fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
+
+ // Average value length.
+ int avgValueLen =
+ entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
+ fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
+
+ fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
+ false);
+ }
+
+ /**
+ * Add to the file info. All added key/value pairs can be obtained using
+ * {@link HFile.Reader#loadFileInfo()}.
+ *
+ * @param k Key
+ * @param v Value
+ * @throws IOException in case the key or the value are invalid
+ */
+ @Override
+ public void appendFileInfo(final byte[] k, final byte[] v)
+ throws IOException {
+ fileInfo.append(k, v, true);
+ }
+
+ /**
+ * Sets the file info offset in the trailer, finishes up populating fields in
+ * the file info, and writes the file info into the given data output. The
+ * reason the data output is not always {@link #outputStream} is that we store
+ * file info as a block in version 2.
+ *
+ * @param trailer fixed file trailer
+ * @param out the data output to write the file info to
+ * @throws IOException
+ */
+ protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
+ throws IOException {
+ trailer.setFileInfoOffset(outputStream.getPos());
+ finishFileInfo();
+ fileInfo.write(out);
+ }
+
+ /**
+ * Checks that the given Cell's key does not violate the key order.
+ *
+ * @param cell Cell whose key to check.
+ * @return true if the key is duplicate
+ * @throws IOException if the key or the key order is wrong
+ */
+ protected boolean checkKey(final Cell cell) throws IOException {
+ boolean isDuplicateKey = false;
+
+ if (cell == null) {
+ throw new IOException("Key cannot be null or empty");
+ }
+ if (lastCell != null) {
+ int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell);
+
+ if (keyComp > 0) {
+ throw new IOException("Added a key not lexically larger than"
+ + " previous. Current cell = " + cell + ", lastCell = " + lastCell);
+ } else if (keyComp == 0) {
+ isDuplicateKey = true;
+ }
+ }
+ return isDuplicateKey;
+ }
+
+ /** Checks the given value for validity. */
+ protected void checkValue(final byte[] value, final int offset,
+ final int length) throws IOException {
+ if (value == null) {
+ throw new IOException("Value cannot be null");
+ }
+ }
+
+ /**
+ * @return Path or null if we were passed a stream rather than a Path.
+ */
+ @Override
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public String toString() {
+ return "writer=" + (path != null ? path.toString() : null) + ", name="
+ + name + ", compression=" + hFileContext.getCompression().getName();
+ }
+
+ /**
+ * Sets remaining trailer fields, writes the trailer to disk, and optionally
+ * closes the output stream.
+ */
+ protected void finishClose(FixedFileTrailer trailer) throws IOException {
+ trailer.setMetaIndexCount(metaNames.size());
+ trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
+ trailer.setEntryCount(entryCount);
+ trailer.setCompressionCodec(hFileContext.getCompression());
+
+ trailer.serialize(outputStream);
+
+ if (closeOutputStream) {
+ outputStream.close();
+ outputStream = null;
+ }
+ }
+
+ public static Compression.Algorithm compressionByName(String algoName) {
+ if (algoName == null)
+ return HFile.DEFAULT_COMPRESSION_ALGORITHM;
+ return Compression.getCompressionAlgorithmByName(algoName);
+ }
+
+ /** A helper method to create HFile output streams in constructors */
+ protected static FSDataOutputStream createOutputStream(Configuration conf,
+ FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
+ FsPermission perms = FSUtils.getFilePermissions(fs, conf,
+ HConstants.DATA_FILE_UMASK_KEY);
+ return FSUtils.create(fs, path, perms, favoredNodes);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
index 3dcfc9b67a8..56510f06c5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
@@ -238,7 +238,7 @@ public class FixedFileTrailer {
BlockType.TRAILER.readAndCheck(inputStream);
if (majorVersion > 2
- || (majorVersion == 2 && minorVersion >= HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION)) {
+ || (majorVersion == 2 && minorVersion >= HFileReaderV2.PBUF_TRAILER_MINOR_VERSION)) {
deserializeFromPB(inputStream);
} else {
deserializeFromWritable(inputStream);
@@ -611,9 +611,7 @@ public class FixedFileTrailer {
}
public byte[] getEncryptionKey() {
- // This is a v3 feature but if reading a v2 file the encryptionKey will just be null which
- // if fine for this feature.
- expectAtLeastMajorVersion(2);
+ expectAtLeastMajorVersion(3);
return encryptionKey;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 09233a2d3d0..610fe7fc60b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.Writable;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -198,8 +197,6 @@ public class HFile {
/** API required to write an {@link HFile} */
public interface Writer extends Closeable {
- /** Max memstore (mvcc) timestamp in FileInfo */
- public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
/** Add an element to the file info map. */
void appendFileInfo(byte[] key, byte[] value) throws IOException;
@@ -297,7 +294,7 @@ public class HFile {
"filesystem/path or path");
}
if (path != null) {
- ostream = HFileWriterImpl.createOutputStream(conf, fs, path, favoredNodes);
+ ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
}
return createWriter(fs, path, ostream,
comparator, fileContext);
@@ -336,12 +333,9 @@ public class HFile {
int version = getFormatVersion(conf);
switch (version) {
case 2:
- throw new IllegalArgumentException("This should never happen. " +
- "Did you change hfile.format.version to read v2? This version of the software writes v3" +
- " hfiles only (but it can read v2 files without having to update hfile.format.version " +
- "in hbase-site.xml)");
+ return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
case 3:
- return new HFileWriterFactory(conf, cacheConf);
+ return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
default:
throw new IllegalArgumentException("Cannot create writer for HFile " +
"format version " + version);
@@ -446,18 +440,6 @@ public class HFile {
* Return the file context of the HFile this reader belongs to
*/
HFileContext getFileContext();
-
- boolean shouldIncludeMemstoreTS();
-
- boolean isDecodeMemstoreTS();
-
- DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction);
-
- @VisibleForTesting
- HFileBlock.FSReader getUncachedBlockReader();
-
- @VisibleForTesting
- boolean prefetchComplete();
}
/**
@@ -481,10 +463,9 @@ public class HFile {
trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
switch (trailer.getMajorVersion()) {
case 2:
- LOG.debug("Opening HFile v2 with v3 reader");
- // Fall through.
+ return new HFileReaderV2(path, trailer, fsdis, size, cacheConf, hfs, conf);
case 3 :
- return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, conf);
+ return new HFileReaderV3(path, trailer, fsdis, size, cacheConf, hfs, conf);
default:
throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
}
@@ -508,7 +489,6 @@ public class HFile {
* @return A version specific Hfile Reader
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
*/
- @SuppressWarnings("resource")
public static Reader createReader(FileSystem fs, Path path,
FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf)
throws IOException {
@@ -874,18 +854,6 @@ public class HFile {
}
}
-
- public static void checkHFileVersion(final Configuration c) {
- int version = c.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
- if (version < MAX_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
- throw new IllegalArgumentException("The setting for " + FORMAT_VERSION_KEY +
- " (in your hbase-*.xml files) is " + version + " which does not match " +
- MAX_FORMAT_VERSION +
- "; are you running with a configuration from an older or newer hbase install (an " +
- "incompatible hbase-default.xml or hbase-site.xml on your CLASSPATH)?");
- }
- }
-
public static void main(String[] args) throws Exception {
// delegate to preserve old behavior
HFilePrettyPrinter.main(args);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index a64bb948cdf..411594183f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -1256,40 +1256,13 @@ public class HFileBlock implements Cacheable {
/** Get the default decoder for blocks from this file. */
HFileBlockDecodingContext getDefaultBlockDecodingContext();
-
- void setIncludesMemstoreTS(boolean includesMemstoreTS);
- void setDataBlockEncoder(HFileDataBlockEncoder encoder);
}
/**
- * We always prefetch the header of the next block, so that we know its
- * on-disk size in advance and can read it in one operation.
+ * A common implementation of some methods of {@link FSReader} and some
+ * tools for implementing HFile format version-specific block readers.
*/
- private static class PrefetchedHeader {
- long offset = -1;
- byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
- final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
- }
-
- /** Reads version 2 blocks from the filesystem. */
- static class FSReaderImpl implements FSReader {
- /** The file system stream of the underlying {@link HFile} that
- * does or doesn't do checksum validations in the filesystem */
- protected FSDataInputStreamWrapper streamWrapper;
-
- private HFileBlockDecodingContext encodedBlockDecodingCtx;
-
- /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
- private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
-
- private ThreadLocal prefetchedHeaderForThread =
- new ThreadLocal() {
- @Override
- public PrefetchedHeader initialValue() {
- return new PrefetchedHeader();
- }
- };
-
+ private abstract static class AbstractFSReader implements FSReader {
/** Compression algorithm used by the {@link HFile} */
/** The size of the file we are reading from, or -1 if unknown. */
@@ -1311,31 +1284,18 @@ public class HFileBlock implements Cacheable {
protected HFileContext fileContext;
- public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
- HFileContext fileContext) throws IOException {
+ public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
+ throws IOException {
this.fileSize = fileSize;
this.hfs = hfs;
this.path = path;
this.fileContext = fileContext;
this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
-
- this.streamWrapper = stream;
- // Older versions of HBase didn't support checksum.
- this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
- defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
- encodedBlockDecodingCtx = defaultDecodingCtx;
}
- /**
- * A constructor that reads files with the latest minor version.
- * This is used by unit tests only.
- */
- FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
- throws IOException {
- this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
- }
-
- public BlockIterator blockRange(final long startOffset, final long endOffset) {
+ @Override
+ public BlockIterator blockRange(final long startOffset,
+ final long endOffset) {
final FSReader owner = this; // handle for inner class
return new BlockIterator() {
private long offset = startOffset;
@@ -1432,6 +1392,56 @@ public class HFileBlock implements Cacheable {
return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
}
+ }
+
+ /**
+ * We always prefetch the header of the next block, so that we know its
+ * on-disk size in advance and can read it in one operation.
+ */
+ private static class PrefetchedHeader {
+ long offset = -1;
+ byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
+ final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
+ }
+
+ /** Reads version 2 blocks from the filesystem. */
+ static class FSReaderImpl extends AbstractFSReader {
+ /** The file system stream of the underlying {@link HFile} that
+ * does or doesn't do checksum validations in the filesystem */
+ protected FSDataInputStreamWrapper streamWrapper;
+
+ private HFileBlockDecodingContext encodedBlockDecodingCtx;
+
+ /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
+ private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
+
+ private ThreadLocal prefetchedHeaderForThread =
+ new ThreadLocal() {
+ @Override
+ public PrefetchedHeader initialValue() {
+ return new PrefetchedHeader();
+ }
+ };
+
+ public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
+ HFileContext fileContext) throws IOException {
+ super(fileSize, hfs, path, fileContext);
+ this.streamWrapper = stream;
+ // Older versions of HBase didn't support checksum.
+ this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
+ defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
+ encodedBlockDecodingCtx = defaultDecodingCtx;
+ }
+
+ /**
+ * A constructor that reads files with the latest minor version.
+ * This is used by unit tests only.
+ */
+ FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
+ throws IOException {
+ this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
+ }
+
/**
* Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
* little memory allocation as possible, using the provided on-disk size.
@@ -1672,11 +1682,11 @@ public class HFileBlock implements Cacheable {
return b;
}
- public void setIncludesMemstoreTS(boolean includesMemstoreTS) {
+ void setIncludesMemstoreTS(boolean includesMemstoreTS) {
this.fileContext.setIncludesMvcc(includesMemstoreTS);
}
- public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
+ void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 5b54807a2da..77266dff8ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -55,8 +55,8 @@ import org.apache.hadoop.util.StringUtils;
*
* Examples of how to use the block index writer can be found in
* {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter} and
- * {@link HFileWriterImpl}. Examples of how to use the reader can be
- * found in {@link HFileWriterImpl} and TestHFileBlockIndex.
+ * {@link HFileWriterV2}. Examples of how to use the reader can be
+ * found in {@link HFileReaderV2} and TestHFileBlockIndex.
*/
@InterfaceAudience.Private
public class HFileBlockIndex {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
similarity index 70%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
index a007f37b416..c0e3e91ba17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,15 +20,12 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.security.Key;
-import java.security.KeyException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -39,15 +35,10 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.crypto.Cipher;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.security.EncryptionUtil;
-import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
@@ -58,63 +49,32 @@ import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
/**
- * Implementation that can handle all hfile versions of {@link HFile.Reader}.
+ * {@link HFile} reader for version 2.
*/
@InterfaceAudience.Private
-@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
-public class HFileReaderImpl implements HFile.Reader, Configurable {
- // This class is HFileReaderV3 + HFileReaderV2 + AbstractHFileReader all squashed together into
- // one file. Ditto for all the HFileReader.ScannerV? implementations. I was running up against
- // the MaxInlineLevel limit because too many tiers involved reading from an hfile. Was also hard
- // to navigate the source code when so many classes participating in read.
- private static final Log LOG = LogFactory.getLog(HFileReaderImpl.class);
+public class HFileReaderV2 extends AbstractHFileReader {
- /** Data block index reader keeping the root data index in memory */
- private HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
+ private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
- /** Meta block index reader -- always single level */
- private HFileBlockIndex.BlockIndexReader metaBlockIndexReader;
+ /** Minor versions in HFile V2 starting with this number have hbase checksums */
+ public static final int MINOR_VERSION_WITH_CHECKSUM = 1;
+ /** In HFile V2 minor version that does not support checksums */
+ public static final int MINOR_VERSION_NO_CHECKSUM = 0;
- private final FixedFileTrailer trailer;
-
- /** Filled when we read in the trailer. */
- private final Compression.Algorithm compressAlgo;
+ /** HFile minor version that introduced pbuf filetrailer */
+ public static final int PBUF_TRAILER_MINOR_VERSION = 2;
/**
- * What kind of data block encoding should be used while reading, writing,
- * and handling cache.
+ * The size of a (key length, value length) tuple that prefixes each entry in
+ * a data block.
*/
- private HFileDataBlockEncoder dataBlockEncoder = NoOpDataBlockEncoder.INSTANCE;
+ public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
- /** Last key in the file. Filled in when we read in the file info */
- private byte [] lastKey = null;
-
- /** Average key length read from file info */
- private int avgKeyLen = -1;
-
- /** Average value length read from file info */
- private int avgValueLen = -1;
-
- /** Key comparator */
- private KVComparator comparator = new KVComparator();
-
- /** Size of this file. */
- private final long fileSize;
-
- /** Block cache configuration. */
- private final CacheConfig cacheConf;
-
- /** Path of file */
- private final Path path;
-
- /** File name to be used for block names */
- private final String name;
-
- private FileInfo fileInfo;
-
- private Configuration conf;
-
- private HFileContext hfileContext;
+ protected boolean includesMemstoreTS = false;
+ protected boolean decodeMemstoreTS = false;
+ protected boolean shouldIncludeMemstoreTS() {
+ return includesMemstoreTS;
+ }
/** Filesystem-level block reader. */
protected HFileBlock.FSReader fsBlockReader;
@@ -141,48 +101,34 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// the file. This version can read Writables version 1.
static final int MAX_MINOR_VERSION = 3;
- /**
- * We can read files whose major version is v2 IFF their minor version is at least 3.
- */
- private static final int MIN_V2_MINOR_VERSION_WITH_PB = 3;
-
/** Minor versions starting with this number have faked index key */
static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
+ protected HFileContext hfileContext;
+
/**
* Opens a HFile. You must load the index before you can use it by calling
* {@link #loadFileInfo()}.
- * @param path
- * Path to HFile.
- * @param trailer
- * File trailer.
- * @param fsdis
- * input stream.
- * @param fileSize
- * Length of the stream.
- * @param cacheConf
- * Cache configuration.
+ *
+ * @param path Path to HFile.
+ * @param trailer File trailer.
+ * @param fsdis input stream.
+ * @param size Length of the stream.
+ * @param cacheConf Cache configuration.
* @param hfs
- * The file system.
* @param conf
- * Configuration
*/
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
- public HFileReaderImpl(final Path path, FixedFileTrailer trailer,
- final FSDataInputStreamWrapper fsdis,
- final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs,
- final Configuration conf)
- throws IOException {
- this.trailer = trailer;
- this.compressAlgo = trailer.getCompressionCodec();
- this.cacheConf = cacheConf;
- this.fileSize = fileSize;
- this.path = path;
- this.name = path.getName();
+ public HFileReaderV2(final Path path, final FixedFileTrailer trailer,
+ final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
+ final HFileSystem hfs, final Configuration conf) throws IOException {
+ super(path, trailer, size, cacheConf, hfs, conf);
this.conf = conf;
- checkFileVersion();
+ trailer.expectMajorVersion(getMajorVersion());
+ validateMinorVersion(path, trailer.getMinorVersion());
this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
- this.fsBlockReader = new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
+ HFileBlock.FSReaderImpl fsBlockReaderV2 =
+ new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
+ this.fsBlockReader = fsBlockReaderV2; // upcast
// Comparator class name is stored in the trailer in version 2.
comparator = trailer.createComparator();
@@ -193,7 +139,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// Parse load-on-open data.
- HFileBlock.BlockIterator blockIter = fsBlockReader.blockRange(
+ HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
trailer.getLoadOnOpenDataOffset(),
fileSize - trailer.getTrailerSize());
@@ -212,22 +158,23 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
fileInfo = new FileInfo();
fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
byte[] creationTimeBytes = fileInfo.get(FileInfo.CREATE_TIME_TS);
- this.hfileContext.setFileCreateTime(creationTimeBytes == null ?
- 0 : Bytes.toLong(creationTimeBytes));
+ this.hfileContext.setFileCreateTime(creationTimeBytes == null ? 0 : Bytes.toLong(creationTimeBytes));
lastKey = fileInfo.get(FileInfo.LASTKEY);
avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
- byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
+ byte [] keyValueFormatVersion =
+ fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
includesMemstoreTS = keyValueFormatVersion != null &&
- Bytes.toInt(keyValueFormatVersion) == HFileWriterImpl.KEY_VALUE_VER_WITH_MEMSTORE;
- fsBlockReader.setIncludesMemstoreTS(includesMemstoreTS);
+ Bytes.toInt(keyValueFormatVersion) ==
+ HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
+ fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
if (includesMemstoreTS) {
- decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterImpl.MAX_MEMSTORE_TS_KEY)) > 0;
+ decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
}
// Read data block encoding algorithm name from file info.
dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
- fsBlockReader.setDataBlockEncoder(dataBlockEncoder);
+ fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
// Store all other load-on-open blocks for further consumption.
HFileBlock b;
@@ -270,800 +217,38 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
});
}
+ }
- byte[] tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
- // max tag length is not present in the HFile means tags were not at all written to file.
- if (tmp != null) {
- hfileContext.setIncludesTags(true);
- tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED);
- if (tmp != null && Bytes.toBoolean(tmp)) {
- hfileContext.setCompressTags(true);
- }
- }
+ protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
+ HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
+ return new HFileContextBuilder()
+ .withIncludesMvcc(this.includesMemstoreTS)
+ .withCompression(this.compressAlgo)
+ .withHBaseCheckSum(trailer.getMinorVersion() >= MINOR_VERSION_WITH_CHECKSUM)
+ .build();
}
/**
- * File version check is a little sloppy. We read v3 files but can also read v2 files if their
- * content has been pb'd; files written with 0.98.
- */
- private void checkFileVersion() {
- int majorVersion = trailer.getMajorVersion();
- if (majorVersion == getMajorVersion()) return;
- int minorVersion = trailer.getMinorVersion();
- if (majorVersion == 2 && minorVersion >= MIN_V2_MINOR_VERSION_WITH_PB) return;
- // We can read v3 or v2 versions of hfile.
- throw new IllegalArgumentException("Invalid HFile version: major=" +
- trailer.getMajorVersion() + ", minor=" + trailer.getMinorVersion() + ": expected at least " +
- "major=2 and minor=" + MAX_MINOR_VERSION);
- }
-
- @SuppressWarnings("serial")
- public static class BlockIndexNotLoadedException extends IllegalStateException {
- public BlockIndexNotLoadedException() {
- // Add a message in case anyone relies on it as opposed to class name.
- super("Block index not loaded");
- }
- }
-
- private String toStringFirstKey() {
- return KeyValue.keyToString(getFirstKey());
- }
-
- private String toStringLastKey() {
- return KeyValue.keyToString(getLastKey());
- }
-
- @Override
- public String toString() {
- return "reader=" + path.toString() +
- (!isFileInfoLoaded()? "":
- ", compression=" + compressAlgo.getName() +
- ", cacheConf=" + cacheConf +
- ", firstKey=" + toStringFirstKey() +
- ", lastKey=" + toStringLastKey()) +
- ", avgKeyLen=" + avgKeyLen +
- ", avgValueLen=" + avgValueLen +
- ", entries=" + trailer.getEntryCount() +
- ", length=" + fileSize;
- }
-
- @Override
- public long length() {
- return fileSize;
- }
-
- /**
- * @return the first key in the file. May be null if file has no entries. Note
- * that this is not the first row key, but rather the byte form of the
- * first KeyValue.
- */
- @Override
- public byte [] getFirstKey() {
- if (dataBlockIndexReader == null) {
- throw new BlockIndexNotLoadedException();
- }
- return dataBlockIndexReader.isEmpty() ? null
- : dataBlockIndexReader.getRootBlockKey(0);
- }
-
- /**
- * TODO left from {@link HFile} version 1: move this to StoreFile after Ryan's
- * patch goes in to eliminate {@link KeyValue} here.
+ * Create a Scanner on this file. No seeks or reads are done on creation. Call
+ * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
+ * nothing to clean up in a Scanner. Letting go of your references to the
+ * scanner is sufficient.
*
- * @return the first row key, or null if the file is empty.
+ * @param cacheBlocks True if we should cache blocks read in by this scanner.
+ * @param pread Use positional read rather than seek+read if true (pread is
+ * better for random reads, seek+read is better scanning).
+ * @param isCompaction is scanner being used for a compaction?
+ * @return Scanner on this file.
*/
- @Override
- public byte[] getFirstRowKey() {
- byte[] firstKey = getFirstKey();
- return firstKey == null? null: KeyValue.createKeyValueFromKey(firstKey).getRow();
- }
-
- /**
- * TODO left from {@link HFile} version 1: move this to StoreFile after
- * Ryan's patch goes in to eliminate {@link KeyValue} here.
- *
- * @return the last row key, or null if the file is empty.
- */
- @Override
- public byte[] getLastRowKey() {
- byte[] lastKey = getLastKey();
- return lastKey == null? null: KeyValue.createKeyValueFromKey(lastKey).getRow();
- }
-
- /** @return number of KV entries in this HFile */
- @Override
- public long getEntries() {
- return trailer.getEntryCount();
- }
-
- /** @return comparator */
- @Override
- public KVComparator getComparator() {
- return comparator;
- }
-
- /** @return compression algorithm */
- @Override
- public Compression.Algorithm getCompressionAlgorithm() {
- return compressAlgo;
- }
-
- /**
- * @return the total heap size of data and meta block indexes in bytes. Does
- * not take into account non-root blocks of a multilevel data index.
- */
- public long indexSize() {
- return (dataBlockIndexReader != null ? dataBlockIndexReader.heapSize() : 0)
- + ((metaBlockIndexReader != null) ? metaBlockIndexReader.heapSize()
- : 0);
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public HFileBlockIndex.BlockIndexReader getDataBlockIndexReader() {
- return dataBlockIndexReader;
- }
-
- @Override
- public FixedFileTrailer getTrailer() {
- return trailer;
- }
-
- @Override
- public FileInfo loadFileInfo() throws IOException {
- return fileInfo;
- }
-
- /**
- * An exception thrown when an operation requiring a scanner to be seeked
- * is invoked on a scanner that is not seeked.
- */
- @SuppressWarnings("serial")
- public static class NotSeekedException extends IllegalStateException {
- public NotSeekedException() {
- super("Not seeked to a key/value");
- }
- }
-
- protected static class HFileScannerImpl implements HFileScanner {
- private ByteBuffer blockBuffer;
- protected final boolean cacheBlocks;
- protected final boolean pread;
- protected final boolean isCompaction;
- private int currKeyLen;
- private int currValueLen;
- private int currMemstoreTSLen;
- private long currMemstoreTS;
- // Updated but never read?
- protected volatile int blockFetches;
- protected final HFile.Reader reader;
- private int currTagsLen;
-
- protected HFileBlock block;
-
- /**
- * The next indexed key is to keep track of the indexed key of the next data block.
- * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the
- * current data block is the last data block.
- *
- * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
- */
- protected Cell nextIndexedKey;
-
- public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks,
- final boolean pread, final boolean isCompaction) {
- this.reader = reader;
- this.cacheBlocks = cacheBlocks;
- this.pread = pread;
- this.isCompaction = isCompaction;
+ @Override
+ public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
+ final boolean isCompaction) {
+ if (dataBlockEncoder.useEncodedScanner()) {
+ return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
+ hfileContext);
}
- @Override
- public boolean isSeeked(){
- return blockBuffer != null;
- }
-
- @Override
- public String toString() {
- return "HFileScanner for reader " + String.valueOf(getReader());
- }
-
- protected void assertSeeked() {
- if (!isSeeked())
- throw new NotSeekedException();
- }
-
- @Override
- public int seekTo(byte[] key) throws IOException {
- return seekTo(key, 0, key.length);
- }
-
- @Override
- public boolean seekBefore(byte[] key) throws IOException {
- return seekBefore(key, 0, key.length);
- }
-
- @Override
- public int reseekTo(byte[] key) throws IOException {
- return reseekTo(key, 0, key.length);
- }
-
- @Override
- public HFile.Reader getReader() {
- return reader;
- }
-
- protected int getCellBufSize() {
- int kvBufSize = KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
- if (this.reader.getFileContext().isIncludesTags()) {
- kvBufSize += Bytes.SIZEOF_SHORT + currTagsLen;
- }
- return kvBufSize;
- }
-
- protected int getNextCellStartPosition() {
- int nextKvPos = blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
- + currMemstoreTSLen;
- if (this.reader.getFileContext().isIncludesTags()) {
- nextKvPos += Bytes.SIZEOF_SHORT + currTagsLen;
- }
- return nextKvPos;
- }
-
- protected void readKeyValueLen() {
- blockBuffer.mark();
- currKeyLen = blockBuffer.getInt();
- currValueLen = blockBuffer.getInt();
- if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit()
- || currValueLen > blockBuffer.limit()) {
- throw new IllegalStateException("Invalid currKeyLen " + currKeyLen + " or currValueLen "
- + currValueLen + ". Block offset: "
- + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
- + blockBuffer.position() + " (without header).");
- }
- ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
- if (this.reader.getFileContext().isIncludesTags()) {
- // Read short as unsigned, high byte first
- currTagsLen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
- if (currTagsLen < 0 || currTagsLen > blockBuffer.limit()) {
- throw new IllegalStateException("Invalid currTagsLen " + currTagsLen + ". Block offset: "
- + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
- + blockBuffer.position() + " (without header).");
- }
- ByteBufferUtils.skip(blockBuffer, currTagsLen);
- }
- readMvccVersion();
- blockBuffer.reset();
- }
-
- /**
- * Within a loaded block, seek looking for the last key that is smaller than
- * (or equal to?) the key we are interested in.
- * A note on the seekBefore: if you have seekBefore = true, AND the first
- * key in the block = key, then you'll get thrown exceptions. The caller has
- * to check for that case and load the previous block as appropriate.
- * @param key
- * the key to find
- * @param seekBefore
- * find the key before the given key in case of exact match.
- * @return 0 in case of an exact key match, 1 in case of an inexact match,
- * -2 in case of an inexact match and furthermore, the input key
- * less than the first key of current block(e.g. using a faked index
- * key)
- */
- protected int blockSeek(Cell key, boolean seekBefore) {
- int klen, vlen, tlen = 0;
- long memstoreTS = 0;
- int memstoreTSLen = 0;
- int lastKeyValueSize = -1;
- KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue();
- do {
- blockBuffer.mark();
- klen = blockBuffer.getInt();
- vlen = blockBuffer.getInt();
- if (klen < 0 || vlen < 0 || klen > blockBuffer.limit()
- || vlen > blockBuffer.limit()) {
- throw new IllegalStateException("Invalid klen " + klen + " or vlen "
- + vlen + ". Block offset: "
- + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
- + blockBuffer.position() + " (without header).");
- }
- ByteBufferUtils.skip(blockBuffer, klen + vlen);
- if (this.reader.getFileContext().isIncludesTags()) {
- // Read short as unsigned, high byte first
- tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
- if (tlen < 0 || tlen > blockBuffer.limit()) {
- throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: "
- + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
- + blockBuffer.position() + " (without header).");
- }
- ByteBufferUtils.skip(blockBuffer, tlen);
- }
- if (this.reader.shouldIncludeMemstoreTS()) {
- if (this.reader.isDecodeMemstoreTS()) {
- try {
- memstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
- + blockBuffer.position());
- memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
- } catch (Exception e) {
- throw new RuntimeException("Error reading memstore timestamp", e);
- }
- } else {
- memstoreTS = 0;
- memstoreTSLen = 1;
- }
- }
- blockBuffer.reset();
- int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + (Bytes.SIZEOF_INT * 2);
- keyOnlyKv.setKey(blockBuffer.array(), keyOffset, klen);
- int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlyKv);
-
- if (comp == 0) {
- if (seekBefore) {
- if (lastKeyValueSize < 0) {
- throw new IllegalStateException("blockSeek with seekBefore "
- + "at the first key of the block: key="
- + CellUtil.getCellKeyAsString(key)
- + ", blockOffset=" + block.getOffset() + ", onDiskSize="
- + block.getOnDiskSizeWithHeader());
- }
- blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
- readKeyValueLen();
- return 1; // non exact match.
- }
- currKeyLen = klen;
- currValueLen = vlen;
- currTagsLen = tlen;
- if (this.reader.shouldIncludeMemstoreTS()) {
- currMemstoreTS = memstoreTS;
- currMemstoreTSLen = memstoreTSLen;
- }
- return 0; // indicate exact match
- } else if (comp < 0) {
- if (lastKeyValueSize > 0)
- blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
- readKeyValueLen();
- if (lastKeyValueSize == -1 && blockBuffer.position() == 0) {
- return HConstants.INDEX_KEY_MAGIC;
- }
- return 1;
- }
-
- // The size of this key/value tuple, including key/value length fields.
- lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
- // include tag length also if tags included with KV
- if (this.reader.getFileContext().isIncludesTags()) {
- lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT;
- }
- blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
- } while (blockBuffer.remaining() > 0);
-
- // Seek to the last key we successfully read. This will happen if this is
- // the last key/value pair in the file, in which case the following call
- // to next() has to return false.
- blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
- readKeyValueLen();
- return 1; // didn't exactly find it.
- }
-
- @Override
- public Cell getNextIndexedKey() {
- return nextIndexedKey;
- }
-
- @Override
- public int seekTo(byte[] key, int offset, int length) throws IOException {
- // Always rewind to the first key of the block, because the given key
- // might be before or after the current key.
- return seekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
- }
-
- @Override
- public int reseekTo(byte[] key, int offset, int length) throws IOException {
- return reseekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
- }
-
- @Override
- public int seekTo(Cell key) throws IOException {
- return seekTo(key, true);
- }
-
- @Override
- public int reseekTo(Cell key) throws IOException {
- int compared;
- if (isSeeked()) {
- compared = compareKey(reader.getComparator(), key);
- if (compared < 1) {
- // If the required key is less than or equal to current key, then
- // don't do anything.
- return compared;
- } else {
- // The comparison with no_next_index_key has to be checked
- if (this.nextIndexedKey != null &&
- (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader
- .getComparator().compareOnlyKeyPortion(key, nextIndexedKey) < 0)) {
- // The reader shall continue to scan the current data block instead
- // of querying the
- // block index as long as it knows the target key is strictly
- // smaller than
- // the next indexed key or the current data block is the last data
- // block.
- return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false);
- }
- }
- }
- // Don't rewind on a reseek operation, because reseek implies that we are
- // always going forward in the file.
- return seekTo(key, false);
- }
-
- /**
- * An internal API function. Seek to the given key, optionally rewinding to
- * the first key of the block before doing the seek.
- *
- * @param key - a cell representing the key that we need to fetch
- * @param rewind whether to rewind to the first key of the block before
- * doing the seek. If this is false, we are assuming we never go
- * back, otherwise the result is undefined.
- * @return -1 if the key is earlier than the first key of the file,
- * 0 if we are at the given key, 1 if we are past the given key
- * -2 if the key is earlier than the first key of the file while
- * using a faked index key
- * @throws IOException
- */
- public int seekTo(Cell key, boolean rewind) throws IOException {
- HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
- BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block,
- cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
- if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
- // This happens if the key e.g. falls before the beginning of the file.
- return -1;
- }
- return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
- blockWithScanInfo.getNextIndexedKey(), rewind, key, false);
- }
-
- @Override
- public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
- return seekBefore(new KeyValue.KeyOnlyKeyValue(key, offset, length));
- }
-
- @Override
- public boolean seekBefore(Cell key) throws IOException {
- HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block,
- cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction));
- if (seekToBlock == null) {
- return false;
- }
- ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
-
- if (reader.getComparator()
- .compareOnlyKeyPortion(
- new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(),
- firstKey.limit()), key) >= 0) {
- long previousBlockOffset = seekToBlock.getPrevBlockOffset();
- // The key we are interested in
- if (previousBlockOffset == -1) {
- // we have a 'problem', the key we want is the first of the file.
- return false;
- }
-
- // It is important that we compute and pass onDiskSize to the block
- // reader so that it does not have to read the header separately to
- // figure out the size.
- seekToBlock = reader.readBlock(previousBlockOffset,
- seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
- pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
- // TODO shortcut: seek forward in this block to the last key of the
- // block.
- }
- Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey));
- loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true);
- return true;
- }
-
- /**
- * Scans blocks in the "scanned" section of the {@link HFile} until the next
- * data block is found.
- *
- * @return the next block, or null if there are no more data blocks
- * @throws IOException
- */
- protected HFileBlock readNextDataBlock() throws IOException {
- long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
- if (block == null)
- return null;
-
- HFileBlock curBlock = block;
-
- do {
- if (curBlock.getOffset() >= lastDataBlockOffset)
- return null;
-
- if (curBlock.getOffset() < 0) {
- throw new IOException("Invalid block file offset: " + block);
- }
-
- // We are reading the next block without block type validation, because
- // it might turn out to be a non-data block.
- curBlock = reader.readBlock(curBlock.getOffset()
- + curBlock.getOnDiskSizeWithHeader(),
- curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
- isCompaction, true, null, getEffectiveDataBlockEncoding());
- } while (!curBlock.getBlockType().isData());
-
- return curBlock;
- }
-
- public DataBlockEncoding getEffectiveDataBlockEncoding() {
- return this.reader.getEffectiveEncodingInCache(isCompaction);
- }
-
- @Override
- public Cell getKeyValue() {
- if (!isSeeked())
- return null;
-
- KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
- + blockBuffer.position(), getCellBufSize());
- if (this.reader.shouldIncludeMemstoreTS()) {
- ret.setSequenceId(currMemstoreTS);
- }
- return ret;
- }
-
- @Override
- public ByteBuffer getKey() {
- assertSeeked();
- return ByteBuffer.wrap(
- blockBuffer.array(),
- blockBuffer.arrayOffset() + blockBuffer.position()
- + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
- }
-
- public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
- return comparator.compareFlatKey(key, offset, length, blockBuffer.array(),
- blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
- }
-
- @Override
- public ByteBuffer getValue() {
- assertSeeked();
- return ByteBuffer.wrap(
- blockBuffer.array(),
- blockBuffer.arrayOffset() + blockBuffer.position()
- + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
- }
-
- protected void setNonSeekedState() {
- block = null;
- blockBuffer = null;
- currKeyLen = 0;
- currValueLen = 0;
- currMemstoreTS = 0;
- currMemstoreTSLen = 0;
- currTagsLen = 0;
- }
-
- /**
- * Go to the next key/value in the block section. Loads the next block if
- * necessary. If successful, {@link #getKey()} and {@link #getValue()} can
- * be called.
- *
- * @return true if successfully navigated to the next key/value
- */
- @Override
- public boolean next() throws IOException {
- assertSeeked();
-
- try {
- blockBuffer.position(getNextCellStartPosition());
- } catch (IllegalArgumentException e) {
- LOG.error("Current pos = " + blockBuffer.position()
- + "; currKeyLen = " + currKeyLen + "; currValLen = "
- + currValueLen + "; block limit = " + blockBuffer.limit()
- + "; HFile name = " + reader.getName()
- + "; currBlock currBlockOffset = " + block.getOffset());
- throw e;
- }
-
- if (blockBuffer.remaining() <= 0) {
- long lastDataBlockOffset =
- reader.getTrailer().getLastDataBlockOffset();
-
- if (block.getOffset() >= lastDataBlockOffset) {
- setNonSeekedState();
- return false;
- }
-
- // read the next block
- HFileBlock nextBlock = readNextDataBlock();
- if (nextBlock == null) {
- setNonSeekedState();
- return false;
- }
-
- updateCurrBlock(nextBlock);
- return true;
- }
-
- // We are still in the same block.
- readKeyValueLen();
- return true;
- }
-
- /**
- * Positions this scanner at the start of the file.
- *
- * @return false if empty file; i.e. a call to next would return false and
- * the current key and value are undefined.
- * @throws IOException
- */
- @Override
- public boolean seekTo() throws IOException {
- if (reader == null) {
- return false;
- }
-
- if (reader.getTrailer().getEntryCount() == 0) {
- // No data blocks.
- return false;
- }
-
- long firstDataBlockOffset =
- reader.getTrailer().getFirstDataBlockOffset();
- if (block != null && block.getOffset() == firstDataBlockOffset) {
- blockBuffer.rewind();
- readKeyValueLen();
- return true;
- }
-
- block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
- isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
- if (block.getOffset() < 0) {
- throw new IOException("Invalid block offset: " + block.getOffset());
- }
- updateCurrBlock(block);
- return true;
- }
-
- protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
- boolean rewind, Cell key, boolean seekBefore) throws IOException {
- if (block == null || block.getOffset() != seekToBlock.getOffset()) {
- updateCurrBlock(seekToBlock);
- } else if (rewind) {
- blockBuffer.rewind();
- }
-
- // Update the nextIndexedKey
- this.nextIndexedKey = nextIndexedKey;
- return blockSeek(key, seekBefore);
- }
-
- /**
- * Updates the current block to be the given {@link HFileBlock}. Seeks to
- * the the first key/value pair.
- *
- * @param newBlock the block to make current
- */
- protected void updateCurrBlock(HFileBlock newBlock) {
- block = newBlock;
-
- // sanity check
- if (block.getBlockType() != BlockType.DATA) {
- throw new IllegalStateException("Scanner works only on data " +
- "blocks, got " + block.getBlockType() + "; " +
- "fileName=" + reader.getName() + ", " +
- "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " +
- "isCompaction=" + isCompaction);
- }
-
- blockBuffer = block.getBufferWithoutHeader();
- readKeyValueLen();
- blockFetches++;
-
- // Reset the next indexed key
- this.nextIndexedKey = null;
- }
-
- protected void readMvccVersion() {
- if (this.reader.shouldIncludeMemstoreTS()) {
- if (this.reader.isDecodeMemstoreTS()) {
- try {
- currMemstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
- + blockBuffer.position());
- currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
- } catch (Exception e) {
- throw new RuntimeException("Error reading memstore timestamp", e);
- }
- } else {
- currMemstoreTS = 0;
- currMemstoreTSLen = 1;
- }
- }
- }
-
- protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
- ByteBuffer buffer = curBlock.getBufferWithoutHeader();
- // It is safe to manipulate this buffer because we own the buffer object.
- buffer.rewind();
- int klen = buffer.getInt();
- buffer.getInt();
- ByteBuffer keyBuff = buffer.slice();
- keyBuff.limit(klen);
- keyBuff.rewind();
- return keyBuff;
- }
-
- @Override
- public String getKeyString() {
- return Bytes.toStringBinary(blockBuffer.array(),
- blockBuffer.arrayOffset() + blockBuffer.position()
- + KEY_VALUE_LEN_SIZE, currKeyLen);
- }
-
- @Override
- public String getValueString() {
- return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
- + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
- currValueLen);
- }
-
- public int compareKey(KVComparator comparator, Cell key) {
- return comparator.compareOnlyKeyPortion(
- key,
- new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
- + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen));
- }
- }
-
- public Path getPath() {
- return path;
- }
-
- @Override
- public DataBlockEncoding getDataBlockEncoding() {
- return dataBlockEncoder.getDataBlockEncoding();
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- /** Minor versions in HFile starting with this number have hbase checksums */
- public static final int MINOR_VERSION_WITH_CHECKSUM = 1;
- /** In HFile minor version that does not support checksums */
- public static final int MINOR_VERSION_NO_CHECKSUM = 0;
-
- /** HFile minor version that introduced pbuf filetrailer */
- public static final int PBUF_TRAILER_MINOR_VERSION = 2;
-
- /**
- * The size of a (key length, value length) tuple that prefixes each entry in
- * a data block.
- */
- public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
-
- protected boolean includesMemstoreTS = false;
- protected boolean decodeMemstoreTS = false;
-
-
- public boolean isDecodeMemstoreTS() {
- return this.decodeMemstoreTS;
- }
-
- public boolean shouldIncludeMemstoreTS() {
- return includesMemstoreTS;
+ return new ScannerV2(this, cacheBlocks, pread, isCompaction);
}
/**
@@ -1094,16 +279,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// perform this check if cached block is a data block.
if (cachedBlock.getBlockType().isData() &&
!actualDataBlockEncoding.equals(expectedDataBlockEncoding)) {
- // This mismatch may happen if a Scanner, which is used for say a
+ // This mismatch may happen if a ScannerV2, which is used for say a
// compaction, tries to read an encoded block from the block cache.
- // The reverse might happen when an EncodedScanner tries to read
+ // The reverse might happen when an EncodedScannerV2 tries to read
// un-encoded blocks which were cached earlier.
//
// Because returning a data block with an implicit BlockType mismatch
// will cause the requesting scanner to throw a disk read should be
// forced here. This will potentially cause a significant number of
// cache misses, so update so we should keep track of this as it might
- // justify the work on a CompoundScanner.
+ // justify the work on a CompoundScannerV2.
if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) &&
!actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) {
// If the block is encoded but the encoding does not match the
@@ -1125,7 +310,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
return null;
}
-
/**
* @param metaBlockName
* @param cacheBlock Add block to cache, if found
@@ -1196,17 +380,15 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
throw new IOException("Requested block is out of range: " + dataBlockOffset +
", lastDataBlockOffset: " + trailer.getLastDataBlockOffset());
}
- // For any given block from any given file, synchronize reads for said
- // block.
+
+ // For any given block from any given file, synchronize reads for said block.
// Without a cache, this synchronizing is needless overhead, but really
// the other choice is to duplicate work (which the cache would prevent you
// from doing).
-
BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset);
-
boolean useLock = false;
IdLock.Entry lockEntry = null;
- TraceScope traceScope = Trace.startSpan("HFileReaderImpl.readBlock");
+ TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock");
try {
while (true) {
if (useLock) {
@@ -1350,23 +532,567 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
/** For testing */
- public HFileBlock.FSReader getUncachedBlockReader() {
+ @Override
+ HFileBlock.FSReader getUncachedBlockReader() {
return fsBlockReader;
}
+
+ protected abstract static class AbstractScannerV2
+ extends AbstractHFileReader.Scanner {
+ protected HFileBlock block;
+
+ @Override
+ public Cell getNextIndexedKey() {
+ return nextIndexedKey;
+ }
+ /**
+ * The next indexed key is to keep track of the indexed key of the next data block.
+ * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the
+ * current data block is the last data block.
+ *
+ * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
+ */
+ protected Cell nextIndexedKey;
+
+ public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
+ final boolean pread, final boolean isCompaction) {
+ super(r, cacheBlocks, pread, isCompaction);
+ }
+
+ protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
+
+ protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
+ boolean rewind, Cell key, boolean seekBefore) throws IOException;
+
+ @Override
+ public int seekTo(byte[] key, int offset, int length) throws IOException {
+ // Always rewind to the first key of the block, because the given key
+ // might be before or after the current key.
+ return seekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
+ }
+
+ @Override
+ public int reseekTo(byte[] key, int offset, int length) throws IOException {
+ return reseekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
+ }
+
+ @Override
+ public int seekTo(Cell key) throws IOException {
+ return seekTo(key, true);
+ }
+
+ @Override
+ public int reseekTo(Cell key) throws IOException {
+ int compared;
+ if (isSeeked()) {
+ compared = compareKey(reader.getComparator(), key);
+ if (compared < 1) {
+ // If the required key is less than or equal to current key, then
+ // don't do anything.
+ return compared;
+ } else {
+ // The comparison with no_next_index_key has to be checked
+ if (this.nextIndexedKey != null &&
+ (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader
+ .getComparator()
+ .compareOnlyKeyPortion(key, nextIndexedKey) < 0)) {
+ // The reader shall continue to scan the current data block instead
+ // of querying the
+ // block index as long as it knows the target key is strictly
+ // smaller than
+ // the next indexed key or the current data block is the last data
+ // block.
+ return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false);
+ }
+ }
+ }
+ // Don't rewind on a reseek operation, because reseek implies that we are
+ // always going forward in the file.
+ return seekTo(key, false);
+ }
+
+
+ /**
+ * An internal API function. Seek to the given key, optionally rewinding to
+ * the first key of the block before doing the seek.
+ *
+ * @param key - a cell representing the key that we need to fetch
+ * @param rewind whether to rewind to the first key of the block before
+ * doing the seek. If this is false, we are assuming we never go
+ * back, otherwise the result is undefined.
+ * @return -1 if the key is earlier than the first key of the file,
+ * 0 if we are at the given key, 1 if we are past the given key
+ * -2 if the key is earlier than the first key of the file while
+ * using a faked index key
+ * @throws IOException
+ */
+ public int seekTo(Cell key, boolean rewind) throws IOException {
+ HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
+ BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block,
+ cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
+ if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
+ // This happens if the key e.g. falls before the beginning of the file.
+ return -1;
+ }
+ return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
+ blockWithScanInfo.getNextIndexedKey(), rewind, key, false);
+ }
+
+ @Override
+ public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
+ return seekBefore(new KeyValue.KeyOnlyKeyValue(key, offset, length));
+ }
+
+ @Override
+ public boolean seekBefore(Cell key) throws IOException {
+ HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block,
+ cacheBlocks, pread, isCompaction,
+ ((HFileReaderV2) reader).getEffectiveEncodingInCache(isCompaction));
+ if (seekToBlock == null) {
+ return false;
+ }
+ ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
+
+ if (reader.getComparator()
+ .compareOnlyKeyPortion(
+ new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(),
+ firstKey.limit()), key) >= 0) {
+ long previousBlockOffset = seekToBlock.getPrevBlockOffset();
+ // The key we are interested in
+ if (previousBlockOffset == -1) {
+ // we have a 'problem', the key we want is the first of the file.
+ return false;
+ }
+
+ // It is important that we compute and pass onDiskSize to the block
+ // reader so that it does not have to read the header separately to
+ // figure out the size.
+ seekToBlock = reader.readBlock(previousBlockOffset,
+ seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
+ pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
+ // TODO shortcut: seek forward in this block to the last key of the
+ // block.
+ }
+ Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey));
+ loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true);
+ return true;
+ }
+
+ /**
+ * Scans blocks in the "scanned" section of the {@link HFile} until the next
+ * data block is found.
+ *
+ * @return the next block, or null if there are no more data blocks
+ * @throws IOException
+ */
+ protected HFileBlock readNextDataBlock() throws IOException {
+ long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
+ if (block == null)
+ return null;
+
+ HFileBlock curBlock = block;
+
+ do {
+ if (curBlock.getOffset() >= lastDataBlockOffset)
+ return null;
+
+ if (curBlock.getOffset() < 0) {
+ throw new IOException("Invalid block file offset: " + block);
+ }
+
+ // We are reading the next block without block type validation, because
+ // it might turn out to be a non-data block.
+ curBlock = reader.readBlock(curBlock.getOffset()
+ + curBlock.getOnDiskSizeWithHeader(),
+ curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
+ isCompaction, true, null, getEffectiveDataBlockEncoding());
+ } while (!curBlock.getBlockType().isData());
+
+ return curBlock;
+ }
+
+ public DataBlockEncoding getEffectiveDataBlockEncoding() {
+ return ((HFileReaderV2)reader).getEffectiveEncodingInCache(isCompaction);
+ }
+ /**
+ * Compare the given key against the current key
+ * @param comparator
+ * @param key
+ * @param offset
+ * @param length
+ * @return -1 is the passed key is smaller than the current key, 0 if equal and 1 if greater
+ */
+ public abstract int compareKey(KVComparator comparator, byte[] key, int offset,
+ int length);
+
+ public abstract int compareKey(KVComparator comparator, Cell kv);
+ }
+
/**
- * Scanner that operates on encoded data blocks.
+ * Implementation of {@link HFileScanner} interface.
*/
- protected static class EncodedScanner extends HFileScannerImpl {
+ protected static class ScannerV2 extends AbstractScannerV2 {
+ private HFileReaderV2 reader;
+
+ public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
+ final boolean pread, final boolean isCompaction) {
+ super(r, cacheBlocks, pread, isCompaction);
+ this.reader = r;
+ }
+
+ @Override
+ public Cell getKeyValue() {
+ if (!isSeeked())
+ return null;
+
+ KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ + blockBuffer.position(), getCellBufSize());
+ if (this.reader.shouldIncludeMemstoreTS()) {
+ ret.setSequenceId(currMemstoreTS);
+ }
+ return ret;
+ }
+
+ protected int getCellBufSize() {
+ return KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
+ }
+
+ @Override
+ public ByteBuffer getKey() {
+ assertSeeked();
+ return ByteBuffer.wrap(
+ blockBuffer.array(),
+ blockBuffer.arrayOffset() + blockBuffer.position()
+ + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
+ }
+
+ @Override
+ public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
+ return comparator.compareFlatKey(key, offset, length, blockBuffer.array(),
+ blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
+ }
+
+ @Override
+ public ByteBuffer getValue() {
+ assertSeeked();
+ return ByteBuffer.wrap(
+ blockBuffer.array(),
+ blockBuffer.arrayOffset() + blockBuffer.position()
+ + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
+ }
+
+ protected void setNonSeekedState() {
+ block = null;
+ blockBuffer = null;
+ currKeyLen = 0;
+ currValueLen = 0;
+ currMemstoreTS = 0;
+ currMemstoreTSLen = 0;
+ }
+
+ /**
+ * Go to the next key/value in the block section. Loads the next block if
+ * necessary. If successful, {@link #getKey()} and {@link #getValue()} can
+ * be called.
+ *
+ * @return true if successfully navigated to the next key/value
+ */
+ @Override
+ public boolean next() throws IOException {
+ assertSeeked();
+
+ try {
+ blockBuffer.position(getNextCellStartPosition());
+ } catch (IllegalArgumentException e) {
+ LOG.error("Current pos = " + blockBuffer.position()
+ + "; currKeyLen = " + currKeyLen + "; currValLen = "
+ + currValueLen + "; block limit = " + blockBuffer.limit()
+ + "; HFile name = " + reader.getName()
+ + "; currBlock currBlockOffset = " + block.getOffset());
+ throw e;
+ }
+
+ if (blockBuffer.remaining() <= 0) {
+ long lastDataBlockOffset =
+ reader.getTrailer().getLastDataBlockOffset();
+
+ if (block.getOffset() >= lastDataBlockOffset) {
+ setNonSeekedState();
+ return false;
+ }
+
+ // read the next block
+ HFileBlock nextBlock = readNextDataBlock();
+ if (nextBlock == null) {
+ setNonSeekedState();
+ return false;
+ }
+
+ updateCurrBlock(nextBlock);
+ return true;
+ }
+
+ // We are still in the same block.
+ readKeyValueLen();
+ return true;
+ }
+
+ protected int getNextCellStartPosition() {
+ return blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
+ + currMemstoreTSLen;
+ }
+
+ /**
+ * Positions this scanner at the start of the file.
+ *
+ * @return false if empty file; i.e. a call to next would return false and
+ * the current key and value are undefined.
+ * @throws IOException
+ */
+ @Override
+ public boolean seekTo() throws IOException {
+ if (reader == null) {
+ return false;
+ }
+
+ if (reader.getTrailer().getEntryCount() == 0) {
+ // No data blocks.
+ return false;
+ }
+
+ long firstDataBlockOffset =
+ reader.getTrailer().getFirstDataBlockOffset();
+ if (block != null && block.getOffset() == firstDataBlockOffset) {
+ blockBuffer.rewind();
+ readKeyValueLen();
+ return true;
+ }
+
+ block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
+ isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
+ if (block.getOffset() < 0) {
+ throw new IOException("Invalid block offset: " + block.getOffset());
+ }
+ updateCurrBlock(block);
+ return true;
+ }
+
+ @Override
+ protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
+ boolean rewind, Cell key, boolean seekBefore) throws IOException {
+ if (block == null || block.getOffset() != seekToBlock.getOffset()) {
+ updateCurrBlock(seekToBlock);
+ } else if (rewind) {
+ blockBuffer.rewind();
+ }
+
+ // Update the nextIndexedKey
+ this.nextIndexedKey = nextIndexedKey;
+ return blockSeek(key, seekBefore);
+ }
+
+ /**
+ * Updates the current block to be the given {@link HFileBlock}. Seeks to
+ * the the first key/value pair.
+ *
+ * @param newBlock the block to make current
+ */
+ protected void updateCurrBlock(HFileBlock newBlock) {
+ block = newBlock;
+
+ // sanity check
+ if (block.getBlockType() != BlockType.DATA) {
+ throw new IllegalStateException("ScannerV2 works only on data " +
+ "blocks, got " + block.getBlockType() + "; " +
+ "fileName=" + reader.name + ", " +
+ "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
+ "isCompaction=" + isCompaction);
+ }
+
+ blockBuffer = block.getBufferWithoutHeader();
+ readKeyValueLen();
+ blockFetches++;
+
+ // Reset the next indexed key
+ this.nextIndexedKey = null;
+ }
+
+ protected void readKeyValueLen() {
+ blockBuffer.mark();
+ currKeyLen = blockBuffer.getInt();
+ currValueLen = blockBuffer.getInt();
+ ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
+ readMvccVersion();
+ if (currKeyLen < 0 || currValueLen < 0
+ || currKeyLen > blockBuffer.limit()
+ || currValueLen > blockBuffer.limit()) {
+ throw new IllegalStateException("Invalid currKeyLen " + currKeyLen
+ + " or currValueLen " + currValueLen + ". Block offset: "
+ + block.getOffset() + ", block length: " + blockBuffer.limit()
+ + ", position: " + blockBuffer.position() + " (without header).");
+ }
+ blockBuffer.reset();
+ }
+
+ protected void readMvccVersion() {
+ if (this.reader.shouldIncludeMemstoreTS()) {
+ if (this.reader.decodeMemstoreTS) {
+ try {
+ currMemstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
+ + blockBuffer.position());
+ currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading memstore timestamp", e);
+ }
+ } else {
+ currMemstoreTS = 0;
+ currMemstoreTSLen = 1;
+ }
+ }
+ }
+
+ /**
+ * Within a loaded block, seek looking for the last key that is smaller than
+ * (or equal to?) the key we are interested in.
+ *
+ * A note on the seekBefore: if you have seekBefore = true, AND the first
+ * key in the block = key, then you'll get thrown exceptions. The caller has
+ * to check for that case and load the previous block as appropriate.
+ *
+ * @param key
+ * the key to find
+ * @param seekBefore
+ * find the key before the given key in case of exact match.
+ * @return 0 in case of an exact key match, 1 in case of an inexact match,
+ * -2 in case of an inexact match and furthermore, the input key
+ * less than the first key of current block(e.g. using a faked index
+ * key)
+ */
+ protected int blockSeek(Cell key, boolean seekBefore) {
+ int klen, vlen;
+ long memstoreTS = 0;
+ int memstoreTSLen = 0;
+ int lastKeyValueSize = -1;
+ KeyValue.KeyOnlyKeyValue keyOnlykv = new KeyValue.KeyOnlyKeyValue();
+ do {
+ blockBuffer.mark();
+ klen = blockBuffer.getInt();
+ vlen = blockBuffer.getInt();
+ blockBuffer.reset();
+ if (this.reader.shouldIncludeMemstoreTS()) {
+ if (this.reader.decodeMemstoreTS) {
+ try {
+ int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
+ + KEY_VALUE_LEN_SIZE + klen + vlen;
+ memstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset);
+ memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading memstore timestamp", e);
+ }
+ } else {
+ memstoreTS = 0;
+ memstoreTSLen = 1;
+ }
+ }
+
+ int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE;
+ keyOnlykv.setKey(blockBuffer.array(), keyOffset, klen);
+ int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlykv);
+
+ if (comp == 0) {
+ if (seekBefore) {
+ if (lastKeyValueSize < 0) {
+ throw new IllegalStateException("blockSeek with seekBefore "
+ + "at the first key of the block: key="
+ + CellUtil.getCellKeyAsString(key)
+ + ", blockOffset=" + block.getOffset() + ", onDiskSize="
+ + block.getOnDiskSizeWithHeader());
+ }
+ blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+ readKeyValueLen();
+ return 1; // non exact match.
+ }
+ currKeyLen = klen;
+ currValueLen = vlen;
+ if (this.reader.shouldIncludeMemstoreTS()) {
+ currMemstoreTS = memstoreTS;
+ currMemstoreTSLen = memstoreTSLen;
+ }
+ return 0; // indicate exact match
+ } else if (comp < 0) {
+ if (lastKeyValueSize > 0)
+ blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+ readKeyValueLen();
+ if (lastKeyValueSize == -1 && blockBuffer.position() == 0
+ && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) {
+ return HConstants.INDEX_KEY_MAGIC;
+ }
+ return 1;
+ }
+
+ // The size of this key/value tuple, including key/value length fields.
+ lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
+ blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
+ } while (blockBuffer.remaining() > 0);
+
+ // Seek to the last key we successfully read. This will happen if this is
+ // the last key/value pair in the file, in which case the following call
+ // to next() has to return false.
+ blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+ readKeyValueLen();
+ return 1; // didn't exactly find it.
+ }
+
+ @Override
+ protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+ ByteBuffer buffer = curBlock.getBufferWithoutHeader();
+ // It is safe to manipulate this buffer because we own the buffer object.
+ buffer.rewind();
+ int klen = buffer.getInt();
+ buffer.getInt();
+ ByteBuffer keyBuff = buffer.slice();
+ keyBuff.limit(klen);
+ keyBuff.rewind();
+ return keyBuff;
+ }
+
+ @Override
+ public String getKeyString() {
+ return Bytes.toStringBinary(blockBuffer.array(),
+ blockBuffer.arrayOffset() + blockBuffer.position()
+ + KEY_VALUE_LEN_SIZE, currKeyLen);
+ }
+
+ @Override
+ public String getValueString() {
+ return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
+ + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
+ currValueLen);
+ }
+
+ @Override
+ public int compareKey(KVComparator comparator, Cell key) {
+ return comparator.compareOnlyKeyPortion(
+ key,
+ new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen));
+ }
+ }
+
+ /**
+ * ScannerV2 that operates on encoded data blocks.
+ */
+ protected static class EncodedScannerV2 extends AbstractScannerV2 {
private final HFileBlockDecodingContext decodingCtx;
private final DataBlockEncoder.EncodedSeeker seeker;
private final DataBlockEncoder dataBlockEncoder;
protected final HFileContext meta;
- public EncodedScanner(HFile.Reader reader, boolean cacheBlocks,
+ public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
boolean pread, boolean isCompaction, HFileContext meta) {
super(reader, cacheBlocks, pread, isCompaction);
- DataBlockEncoding encoding = reader.getDataBlockEncoding();
+ DataBlockEncoding encoding = reader.dataBlockEncoder.getDataBlockEncoding();
dataBlockEncoder = encoding.getEncoder();
decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
seeker = dataBlockEncoder.createSeeker(
@@ -1465,6 +1191,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return seeker.getKeyDeepCopy();
}
+ @Override
public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
return seeker.compareKey(comparator, key, offset, length);
}
@@ -1503,10 +1230,12 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
}
+ @Override
protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
}
+ @Override
protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
boolean rewind, Cell key, boolean seekBefore) throws IOException {
if (block == null || block.getOffset() != seekToBlock.getOffset()) {
@@ -1518,6 +1247,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return seeker.seekToKeyInBlock(key, seekBefore);
}
+ @Override
public int compareKey(KVComparator comparator, Cell key) {
return seeker.compareKey(comparator, key);
}
@@ -1551,6 +1281,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return null;
}
+ @Override
public boolean isFileInfoLoaded() {
return true; // We load file info in constructor in version 2.
}
@@ -1571,6 +1302,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
}
+ @Override
+ public int getMajorVersion() {
+ return 2;
+ }
+
@Override
public HFileContext getFileContext() {
return hfileContext;
@@ -1581,108 +1317,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
* not completed, true otherwise
*/
@VisibleForTesting
- public boolean prefetchComplete() {
+ boolean prefetchComplete() {
return PrefetchExecutor.isCompleted(path);
}
-
- protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
- HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
- HFileContextBuilder builder = new HFileContextBuilder()
- .withIncludesMvcc(this.includesMemstoreTS)
- .withHBaseCheckSum(true)
- .withCompression(this.compressAlgo);
-
- // Check for any key material available
- byte[] keyBytes = trailer.getEncryptionKey();
- if (keyBytes != null) {
- Encryption.Context cryptoContext = Encryption.newContext(conf);
- Key key;
- String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
- User.getCurrent().getShortName());
- try {
- // First try the master key
- key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
- } catch (KeyException e) {
- // If the current master key fails to unwrap, try the alternate, if
- // one is configured
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
- }
- String alternateKeyName =
- conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
- if (alternateKeyName != null) {
- try {
- key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
- } catch (KeyException ex) {
- throw new IOException(ex);
- }
- } else {
- throw new IOException(e);
- }
- }
- // Use the algorithm the key wants
- Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
- if (cipher == null) {
- throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
- }
- cryptoContext.setCipher(cipher);
- cryptoContext.setKey(key);
- builder.withEncryptionContext(cryptoContext);
- }
-
- HFileContext context = builder.build();
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Reader" + (path != null ? " for " + path : "" ) +
- " initialized with cacheConf: " + cacheConf +
- " comparator: " + comparator.getClass().getSimpleName() +
- " fileContext: " + context);
- }
-
- return context;
- }
-
- /**
- * Create a Scanner on this file. No seeks or reads are done on creation. Call
- * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is
- * nothing to clean up in a Scanner. Letting go of your references to the
- * scanner is sufficient. NOTE: Do not use this overload of getScanner for
- * compactions. See {@link #getScanner(boolean, boolean, boolean)}
- *
- * @param cacheBlocks True if we should cache blocks read in by this scanner.
- * @param pread Use positional read rather than seek+read if true (pread is
- * better for random reads, seek+read is better scanning).
- * @return Scanner on this file.
- */
- @Override
- public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
- return getScanner(cacheBlocks, pread, false);
- }
-
- /**
- * Create a Scanner on this file. No seeks or reads are done on creation. Call
- * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is
- * nothing to clean up in a Scanner. Letting go of your references to the
- * scanner is sufficient.
- * @param cacheBlocks
- * True if we should cache blocks read in by this scanner.
- * @param pread
- * Use positional read rather than seek+read if true (pread is better
- * for random reads, seek+read is better scanning).
- * @param isCompaction
- * is scanner being used for a compaction?
- * @return Scanner on this file.
- */
- @Override
- public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
- final boolean isCompaction) {
- if (dataBlockEncoder.useEncodedScanner()) {
- return new EncodedScanner(this, cacheBlocks, pread, isCompaction, this.hfileContext);
- }
- return new HFileScannerImpl(this, cacheBlocks, pread, isCompaction);
- }
-
- public int getMajorVersion() {
- return 3;
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java
new file mode 100644
index 00000000000..b28d8c1a875
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.security.Key;
+import java.security.KeyException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.crypto.Cipher;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * {@link HFile} reader for version 3.
+ */
+@InterfaceAudience.Private
+public class HFileReaderV3 extends HFileReaderV2 {
+
+ private static final Log LOG = LogFactory.getLog(HFileReaderV3.class);
+
+ public static final int MAX_MINOR_VERSION = 0;
+
+ /**
+ * Opens a HFile. You must load the index before you can use it by calling
+ * {@link #loadFileInfo()}.
+ * @param path
+ * Path to HFile.
+ * @param trailer
+ * File trailer.
+ * @param fsdis
+ * input stream.
+ * @param size
+ * Length of the stream.
+ * @param cacheConf
+ * Cache configuration.
+ * @param hfs
+ * The file system.
+ * @param conf
+ * Configuration
+ */
+ public HFileReaderV3(final Path path, FixedFileTrailer trailer, final FSDataInputStreamWrapper fsdis,
+ final long size, final CacheConfig cacheConf, final HFileSystem hfs,
+ final Configuration conf) throws IOException {
+ super(path, trailer, fsdis, size, cacheConf, hfs, conf);
+ byte[] tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
+ // max tag length is not present in the HFile means tags were not at all written to file.
+ if (tmp != null) {
+ hfileContext.setIncludesTags(true);
+ tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED);
+ if (tmp != null && Bytes.toBoolean(tmp)) {
+ hfileContext.setCompressTags(true);
+ }
+ }
+ }
+
+ @Override
+ protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
+ HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
+ trailer.expectMajorVersion(3);
+ HFileContextBuilder builder = new HFileContextBuilder()
+ .withIncludesMvcc(this.includesMemstoreTS)
+ .withHBaseCheckSum(true)
+ .withCompression(this.compressAlgo);
+
+ // Check for any key material available
+ byte[] keyBytes = trailer.getEncryptionKey();
+ if (keyBytes != null) {
+ Encryption.Context cryptoContext = Encryption.newContext(conf);
+ Key key;
+ String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
+ User.getCurrent().getShortName());
+ try {
+ // First try the master key
+ key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
+ } catch (KeyException e) {
+ // If the current master key fails to unwrap, try the alternate, if
+ // one is configured
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
+ }
+ String alternateKeyName =
+ conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
+ if (alternateKeyName != null) {
+ try {
+ key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
+ } catch (KeyException ex) {
+ throw new IOException(ex);
+ }
+ } else {
+ throw new IOException(e);
+ }
+ }
+ // Use the algorithm the key wants
+ Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
+ if (cipher == null) {
+ throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
+ }
+ cryptoContext.setCipher(cipher);
+ cryptoContext.setKey(key);
+ builder.withEncryptionContext(cryptoContext);
+ }
+
+ HFileContext context = builder.build();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Reader" + (path != null ? " for " + path : "" ) +
+ " initialized with cacheConf: " + cacheConf +
+ " comparator: " + comparator.getClass().getSimpleName() +
+ " fileContext: " + context);
+ }
+
+ return context;
+ }
+
+ /**
+ * Create a Scanner on this file. No seeks or reads are done on creation. Call
+ * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
+ * nothing to clean up in a Scanner. Letting go of your references to the
+ * scanner is sufficient.
+ * @param cacheBlocks
+ * True if we should cache blocks read in by this scanner.
+ * @param pread
+ * Use positional read rather than seek+read if true (pread is better
+ * for random reads, seek+read is better scanning).
+ * @param isCompaction
+ * is scanner being used for a compaction?
+ * @return Scanner on this file.
+ */
+ @Override
+ public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
+ final boolean isCompaction) {
+ if (dataBlockEncoder.useEncodedScanner()) {
+ return new EncodedScannerV3(this, cacheBlocks, pread, isCompaction, this.hfileContext);
+ }
+ return new ScannerV3(this, cacheBlocks, pread, isCompaction);
+ }
+
+ /**
+ * Implementation of {@link HFileScanner} interface.
+ */
+ protected static class ScannerV3 extends ScannerV2 {
+
+ private HFileReaderV3 reader;
+ private int currTagsLen;
+
+ public ScannerV3(HFileReaderV3 r, boolean cacheBlocks, final boolean pread,
+ final boolean isCompaction) {
+ super(r, cacheBlocks, pread, isCompaction);
+ this.reader = r;
+ }
+
+ @Override
+ protected int getCellBufSize() {
+ int kvBufSize = super.getCellBufSize();
+ if (reader.hfileContext.isIncludesTags()) {
+ kvBufSize += Bytes.SIZEOF_SHORT + currTagsLen;
+ }
+ return kvBufSize;
+ }
+
+ protected void setNonSeekedState() {
+ super.setNonSeekedState();
+ currTagsLen = 0;
+ }
+
+ @Override
+ protected int getNextCellStartPosition() {
+ int nextKvPos = super.getNextCellStartPosition();
+ if (reader.hfileContext.isIncludesTags()) {
+ nextKvPos += Bytes.SIZEOF_SHORT + currTagsLen;
+ }
+ return nextKvPos;
+ }
+
+ protected void readKeyValueLen() {
+ blockBuffer.mark();
+ currKeyLen = blockBuffer.getInt();
+ currValueLen = blockBuffer.getInt();
+ if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit()
+ || currValueLen > blockBuffer.limit()) {
+ throw new IllegalStateException("Invalid currKeyLen " + currKeyLen + " or currValueLen "
+ + currValueLen + ". Block offset: "
+ + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ + blockBuffer.position() + " (without header).");
+ }
+ ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
+ if (reader.hfileContext.isIncludesTags()) {
+ // Read short as unsigned, high byte first
+ currTagsLen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
+ if (currTagsLen < 0 || currTagsLen > blockBuffer.limit()) {
+ throw new IllegalStateException("Invalid currTagsLen " + currTagsLen + ". Block offset: "
+ + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ + blockBuffer.position() + " (without header).");
+ }
+ ByteBufferUtils.skip(blockBuffer, currTagsLen);
+ }
+ readMvccVersion();
+ blockBuffer.reset();
+ }
+
+ /**
+ * Within a loaded block, seek looking for the last key that is smaller than
+ * (or equal to?) the key we are interested in.
+ * A note on the seekBefore: if you have seekBefore = true, AND the first
+ * key in the block = key, then you'll get thrown exceptions. The caller has
+ * to check for that case and load the previous block as appropriate.
+ * @param key
+ * the key to find
+ * @param seekBefore
+ * find the key before the given key in case of exact match.
+ * @return 0 in case of an exact key match, 1 in case of an inexact match,
+ * -2 in case of an inexact match and furthermore, the input key
+ * less than the first key of current block(e.g. using a faked index
+ * key)
+ */
+ @Override
+ protected int blockSeek(Cell key, boolean seekBefore) {
+ int klen, vlen, tlen = 0;
+ long memstoreTS = 0;
+ int memstoreTSLen = 0;
+ int lastKeyValueSize = -1;
+ KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue();
+ do {
+ blockBuffer.mark();
+ klen = blockBuffer.getInt();
+ vlen = blockBuffer.getInt();
+ if (klen < 0 || vlen < 0 || klen > blockBuffer.limit()
+ || vlen > blockBuffer.limit()) {
+ throw new IllegalStateException("Invalid klen " + klen + " or vlen "
+ + vlen + ". Block offset: "
+ + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ + blockBuffer.position() + " (without header).");
+ }
+ ByteBufferUtils.skip(blockBuffer, klen + vlen);
+ if (reader.hfileContext.isIncludesTags()) {
+ // Read short as unsigned, high byte first
+ tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
+ if (tlen < 0 || tlen > blockBuffer.limit()) {
+ throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: "
+ + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ + blockBuffer.position() + " (without header).");
+ }
+ ByteBufferUtils.skip(blockBuffer, tlen);
+ }
+ if (this.reader.shouldIncludeMemstoreTS()) {
+ if (this.reader.decodeMemstoreTS) {
+ try {
+ memstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
+ + blockBuffer.position());
+ memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading memstore timestamp", e);
+ }
+ } else {
+ memstoreTS = 0;
+ memstoreTSLen = 1;
+ }
+ }
+ blockBuffer.reset();
+ int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + (Bytes.SIZEOF_INT * 2);
+ keyOnlyKv.setKey(blockBuffer.array(), keyOffset, klen);
+ int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlyKv);
+
+ if (comp == 0) {
+ if (seekBefore) {
+ if (lastKeyValueSize < 0) {
+ throw new IllegalStateException("blockSeek with seekBefore "
+ + "at the first key of the block: key="
+ + CellUtil.getCellKeyAsString(key)
+ + ", blockOffset=" + block.getOffset() + ", onDiskSize="
+ + block.getOnDiskSizeWithHeader());
+ }
+ blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+ readKeyValueLen();
+ return 1; // non exact match.
+ }
+ currKeyLen = klen;
+ currValueLen = vlen;
+ currTagsLen = tlen;
+ if (this.reader.shouldIncludeMemstoreTS()) {
+ currMemstoreTS = memstoreTS;
+ currMemstoreTSLen = memstoreTSLen;
+ }
+ return 0; // indicate exact match
+ } else if (comp < 0) {
+ if (lastKeyValueSize > 0)
+ blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+ readKeyValueLen();
+ if (lastKeyValueSize == -1 && blockBuffer.position() == 0) {
+ return HConstants.INDEX_KEY_MAGIC;
+ }
+ return 1;
+ }
+
+ // The size of this key/value tuple, including key/value length fields.
+ lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
+ // include tag length also if tags included with KV
+ if (reader.hfileContext.isIncludesTags()) {
+ lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT;
+ }
+ blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
+ } while (blockBuffer.remaining() > 0);
+
+ // Seek to the last key we successfully read. This will happen if this is
+ // the last key/value pair in the file, in which case the following call
+ // to next() has to return false.
+ blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+ readKeyValueLen();
+ return 1; // didn't exactly find it.
+ }
+ }
+
+ /**
+ * ScannerV3 that operates on encoded data blocks.
+ */
+ protected static class EncodedScannerV3 extends EncodedScannerV2 {
+ public EncodedScannerV3(HFileReaderV3 reader, boolean cacheBlocks, boolean pread,
+ boolean isCompaction, HFileContext context) {
+ super(reader, cacheBlocks, pread, isCompaction, context);
+ }
+ }
+
+ @Override
+ public int getMajorVersion() {
+ return 3;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterFactory.java
deleted file mode 100644
index 047022d9043..00000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitationsME
- * under the License.
- */
-package org.apache.hadoop.hbase.io.hfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue.KVComparator;
-
-public class HFileWriterFactory extends HFile.WriterFactory {
- HFileWriterFactory(Configuration conf, CacheConfig cacheConf) {
- super(conf, cacheConf);
- }
-
- @Override
- public HFile.Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream,
- KVComparator comparator, HFileContext context)
- throws IOException {
- return new HFileWriterImpl(conf, cacheConf, path, ostream, comparator, context);
- }
-}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
similarity index 55%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
index e2b6efdac46..28c4655271b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,7 +22,6 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -31,105 +31,40 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
-import org.apache.hadoop.hbase.security.EncryptionUtil;
-import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.Writable;
/**
- * Common functionality needed by all versions of {@link HFile} writers.
+ * Writes HFile format version 2.
*/
@InterfaceAudience.Private
-public class HFileWriterImpl implements HFile.Writer {
- private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class);
+public class HFileWriterV2 extends AbstractHFileWriter {
+ static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
- /** The Cell previously appended. Becomes the last cell in the file.*/
- protected Cell lastCell = null;
-
- /** FileSystem stream to write into. */
- protected FSDataOutputStream outputStream;
-
- /** True if we opened the outputStream (and so will close it). */
- protected final boolean closeOutputStream;
-
- /** A "file info" block: a key-value map of file-wide metadata. */
- protected FileInfo fileInfo = new HFile.FileInfo();
-
- /** Total # of key/value entries, i.e. how many times add() was called. */
- protected long entryCount = 0;
-
- /** Used for calculating the average key length. */
- protected long totalKeyLength = 0;
-
- /** Used for calculating the average value length. */
- protected long totalValueLength = 0;
-
- /** Total uncompressed bytes, maybe calculate a compression ratio later. */
- protected long totalUncompressedBytes = 0;
-
- /** Key comparator. Used to ensure we write in order. */
- protected final KVComparator comparator;
-
- /** Meta block names. */
- protected List metaNames = new ArrayList();
-
- /** {@link Writable}s representing meta block data. */
- protected List metaData = new ArrayList();
-
- /**
- * First cell in a block.
- * This reference should be short-lived since we write hfiles in a burst.
- */
- protected Cell firstCellInBlock = null;
-
-
- /** May be null if we were passed a stream. */
- protected final Path path;
-
- /** Cache configuration for caching data on write. */
- protected final CacheConfig cacheConf;
-
- /**
- * Name for this object used when logging or in toString. Is either
- * the result of a toString on stream or else name of passed file Path.
- */
- protected final String name;
-
- /**
- * The data block encoding which will be used.
- * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
- */
- protected final HFileDataBlockEncoder blockEncoder;
-
- protected final HFileContext hFileContext;
-
- private int maxTagsLength = 0;
+ /** Max memstore (mvcc) timestamp in FileInfo */
+ public static final byte [] MAX_MEMSTORE_TS_KEY =
+ Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
/** KeyValue version in FileInfo */
- public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
+ public static final byte [] KEY_VALUE_VERSION =
+ Bytes.toBytes("KEY_VALUE_VERSION");
/** Version for KeyValue which includes memstore timestamp */
public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
/** Inline block writers for multi-level block index and compound Blooms. */
- private List inlineBlockWriters = new ArrayList();
+ private List inlineBlockWriters =
+ new ArrayList();
- /** block writer */
+ /** Unified version 2 block writer */
protected HFileBlock.Writer fsBlockWriter;
private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
@@ -148,135 +83,40 @@ public class HFileWriterImpl implements HFile.Writer {
private Cell lastCellOfPreviousBlock = null;
/** Additional data items to be written to the "load-on-open" section. */
- private List additionalLoadOnOpenData = new ArrayList();
+ private List additionalLoadOnOpenData =
+ new ArrayList();
protected long maxMemstoreTS = 0;
- public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
- FSDataOutputStream outputStream,
- KVComparator comparator, HFileContext fileContext) {
- this.outputStream = outputStream;
- this.path = path;
- this.name = path != null ? path.getName() : outputStream.toString();
- this.hFileContext = fileContext;
- DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
- if (encoding != DataBlockEncoding.NONE) {
- this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
- } else {
- this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
+ static class WriterFactoryV2 extends HFile.WriterFactory {
+ WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
+ super(conf, cacheConf);
}
- this.comparator = comparator != null ? comparator
- : KeyValue.COMPARATOR;
- closeOutputStream = path != null;
- this.cacheConf = cacheConf;
- finishInit(conf);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Writer" + (path != null ? " for " + path : "") +
- " initialized with cacheConf: " + cacheConf +
- " comparator: " + comparator.getClass().getSimpleName() +
- " fileContext: " + fileContext);
- }
- }
-
- /**
- * Add to the file info. All added key/value pairs can be obtained using
- * {@link HFile.Reader#loadFileInfo()}.
- *
- * @param k Key
- * @param v Value
- * @throws IOException in case the key or the value are invalid
- */
- @Override
- public void appendFileInfo(final byte[] k, final byte[] v)
- throws IOException {
- fileInfo.append(k, v, true);
- }
-
- /**
- * Sets the file info offset in the trailer, finishes up populating fields in
- * the file info, and writes the file info into the given data output. The
- * reason the data output is not always {@link #outputStream} is that we store
- * file info as a block in version 2.
- *
- * @param trailer fixed file trailer
- * @param out the data output to write the file info to
- * @throws IOException
- */
- protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
- throws IOException {
- trailer.setFileInfoOffset(outputStream.getPos());
- finishFileInfo();
- fileInfo.write(out);
- }
-
- /**
- * Checks that the given Cell's key does not violate the key order.
- *
- * @param cell Cell whose key to check.
- * @return true if the key is duplicate
- * @throws IOException if the key or the key order is wrong
- */
- protected boolean checkKey(final Cell cell) throws IOException {
- boolean isDuplicateKey = false;
-
- if (cell == null) {
- throw new IOException("Key cannot be null or empty");
- }
- if (lastCell != null) {
- int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell);
-
- if (keyComp > 0) {
- throw new IOException("Added a key not lexically larger than"
- + " previous. Current cell = " + cell + ", lastCell = " + lastCell);
- } else if (keyComp == 0) {
- isDuplicateKey = true;
+ @Override
+ public Writer createWriter(FileSystem fs, Path path,
+ FSDataOutputStream ostream,
+ KVComparator comparator, HFileContext context) throws IOException {
+ context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
+ return new HFileWriterV2(conf, cacheConf, fs, path, ostream,
+ comparator, context);
}
}
- return isDuplicateKey;
- }
- /** Checks the given value for validity. */
- protected void checkValue(final byte[] value, final int offset,
- final int length) throws IOException {
- if (value == null) {
- throw new IOException("Value cannot be null");
- }
- }
-
- /**
- * @return Path or null if we were passed a stream rather than a Path.
- */
- @Override
- public Path getPath() {
- return path;
- }
-
- @Override
- public String toString() {
- return "writer=" + (path != null ? path.toString() : null) + ", name="
- + name + ", compression=" + hFileContext.getCompression().getName();
- }
-
- public static Compression.Algorithm compressionByName(String algoName) {
- if (algoName == null)
- return HFile.DEFAULT_COMPRESSION_ALGORITHM;
- return Compression.getCompressionAlgorithmByName(algoName);
- }
-
- /** A helper method to create HFile output streams in constructors */
- protected static FSDataOutputStream createOutputStream(Configuration conf,
- FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
- FsPermission perms = FSUtils.getFilePermissions(fs, conf,
- HConstants.DATA_FILE_UMASK_KEY);
- return FSUtils.create(fs, path, perms, favoredNodes);
+ /** Constructor that takes a path, creates and closes the output stream. */
+ public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
+ FileSystem fs, Path path, FSDataOutputStream ostream,
+ final KVComparator comparator, final HFileContext context) throws IOException {
+ super(cacheConf,
+ ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
+ path, comparator, context);
+ finishInit(conf);
}
/** Additional initialization steps */
protected void finishInit(final Configuration conf) {
- if (fsBlockWriter != null) {
+ if (fsBlockWriter != null)
throw new IllegalStateException("finishInit called twice");
- }
fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
@@ -300,7 +140,9 @@ public class HFileWriterImpl implements HFile.Writer {
* @throws IOException
*/
protected void checkBlockBoundary() throws IOException {
- if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize()) return;
+ if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
+ return;
+
finishBlock();
writeInlineBlocks(false);
newBlock();
@@ -308,7 +150,8 @@ public class HFileWriterImpl implements HFile.Writer {
/** Clean up the current data block */
private void finishBlock() throws IOException {
- if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return;
+ if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
+ return;
// Update the first data block offset for scanning.
if (firstDataBlockOffset == -1) {
@@ -318,6 +161,7 @@ public class HFileWriterImpl implements HFile.Writer {
lastDataBlockOffset = outputStream.getPos();
fsBlockWriter.writeHeaderAndData(outputStream);
int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
+
Cell indexEntry =
CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
@@ -355,7 +199,8 @@ public class HFileWriterImpl implements HFile.Writer {
*/
private void doCacheOnWrite(long offset) {
HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
- cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(name, offset), cacheFormatBlock);
+ cacheConf.getBlockCache().cacheBlock(
+ new BlockCacheKey(name, offset), cacheFormatBlock);
}
/**
@@ -399,6 +244,48 @@ public class HFileWriterImpl implements HFile.Writer {
metaData.add(i, content);
}
+ /**
+ * Add key/value to file. Keys must be added in an order that agrees with the
+ * Comparator passed on construction.
+ *
+ * @param cell Cell to add. Cannot be empty nor null.
+ * @throws IOException
+ */
+ @Override
+ public void append(final Cell cell) throws IOException {
+ byte[] value = cell.getValueArray();
+ int voffset = cell.getValueOffset();
+ int vlength = cell.getValueLength();
+ // checkKey uses comparator to check we are writing in order.
+ boolean dupKey = checkKey(cell);
+ checkValue(value, voffset, vlength);
+ if (!dupKey) {
+ checkBlockBoundary();
+ }
+
+ if (!fsBlockWriter.isWriting()) {
+ newBlock();
+ }
+
+ fsBlockWriter.write(cell);
+
+ totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
+ totalValueLength += vlength;
+
+ // Are we the first key in this block?
+ if (firstCellInBlock == null) {
+ // If cell is big, block will be closed and this firstCellInBlock reference will only last
+ // a short while.
+ firstCellInBlock = cell;
+ }
+
+ // TODO: What if cell is 10MB and we write infrequently? We'll hold on to the cell here
+ // indefinetly?
+ lastCell = cell;
+ entryCount++;
+ this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
+ }
+
@Override
public void close() throws IOException {
if (outputStream == null) {
@@ -522,120 +409,16 @@ public class HFileWriterImpl implements HFile.Writer {
});
}
+ protected int getMajorVersion() {
+ return 2;
+ }
+
+ protected int getMinorVersion() {
+ return HFileReaderV2.MAX_MINOR_VERSION;
+ }
+
@Override
public HFileContext getFileContext() {
return hFileContext;
}
-
- /**
- * Add key/value to file. Keys must be added in an order that agrees with the
- * Comparator passed on construction.
- *
- * @param cell
- * Cell to add. Cannot be empty nor null.
- * @throws IOException
- */
- @Override
- public void append(final Cell cell) throws IOException {
- byte[] value = cell.getValueArray();
- int voffset = cell.getValueOffset();
- int vlength = cell.getValueLength();
- // checkKey uses comparator to check we are writing in order.
- boolean dupKey = checkKey(cell);
- checkValue(value, voffset, vlength);
- if (!dupKey) {
- checkBlockBoundary();
- }
-
- if (!fsBlockWriter.isWriting()) {
- newBlock();
- }
-
- fsBlockWriter.write(cell);
-
- totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
- totalValueLength += vlength;
-
- // Are we the first key in this block?
- if (firstCellInBlock == null) {
- // If cell is big, block will be closed and this firstCellInBlock reference will only last
- // a short while.
- firstCellInBlock = cell;
- }
-
- // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinetly?
- lastCell = cell;
- entryCount++;
- this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
- int tagsLength = cell.getTagsLength();
- if (tagsLength > this.maxTagsLength) {
- this.maxTagsLength = tagsLength;
- }
- }
-
- protected void finishFileInfo() throws IOException {
- if (lastCell != null) {
- // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
- // byte buffer. Won't take a tuple.
- byte [] lastKey = CellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
- fileInfo.append(FileInfo.LASTKEY, lastKey, false);
- }
-
- // Average key length.
- int avgKeyLen =
- entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
- fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
- fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
- false);
-
- // Average value length.
- int avgValueLen =
- entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
- fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
- if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) {
- // In case of Prefix Tree encoding, we always write tags information into HFiles even if all
- // KVs are having no tags.
- fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
- } else if (hFileContext.isIncludesTags()) {
- // When tags are not being written in this file, MAX_TAGS_LEN is excluded
- // from the FileInfo
- fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
- boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
- && hFileContext.isCompressTags();
- fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
- }
- }
-
- protected int getMajorVersion() {
- return 3;
- }
-
- protected int getMinorVersion() {
- return HFileReaderImpl.MAX_MINOR_VERSION;
- }
-
- protected void finishClose(FixedFileTrailer trailer) throws IOException {
- // Write out encryption metadata before finalizing if we have a valid crypto context
- Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
- if (cryptoContext != Encryption.Context.NONE) {
- // Wrap the context's key and write it as the encryption metadata, the wrapper includes
- // all information needed for decryption
- trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(),
- cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
- User.getCurrent().getShortName()),
- cryptoContext.getKey()));
- }
- // Now we can finish the close
- trailer.setMetaIndexCount(metaNames.size());
- trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
- trailer.setEntryCount(entryCount);
- trailer.setCompressionCodec(hFileContext.getCompression());
-
- trailer.serialize(outputStream);
-
- if (closeOutputStream) {
- outputStream.close();
- outputStream = null;
- }
- }
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
new file mode 100644
index 00000000000..086395ca6be
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * {@link HFile} writer for version 3.
+ */
+@InterfaceAudience.Private
+public class HFileWriterV3 extends HFileWriterV2 {
+
+ private static final Log LOG = LogFactory.getLog(HFileWriterV3.class);
+
+ private int maxTagsLength = 0;
+
+ static class WriterFactoryV3 extends HFile.WriterFactory {
+ WriterFactoryV3(Configuration conf, CacheConfig cacheConf) {
+ super(conf, cacheConf);
+ }
+
+ @Override
+ public Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream,
+ final KVComparator comparator, HFileContext fileContext)
+ throws IOException {
+ return new HFileWriterV3(conf, cacheConf, fs, path, ostream, comparator, fileContext);
+ }
+ }
+
+ /** Constructor that takes a path, creates and closes the output stream. */
+ public HFileWriterV3(Configuration conf, CacheConfig cacheConf, FileSystem fs, Path path,
+ FSDataOutputStream ostream, final KVComparator comparator,
+ final HFileContext fileContext) throws IOException {
+ super(conf, cacheConf, fs, path, ostream, comparator, fileContext);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writer" + (path != null ? " for " + path : "") +
+ " initialized with cacheConf: " + cacheConf +
+ " comparator: " + comparator.getClass().getSimpleName() +
+ " fileContext: " + fileContext);
+ }
+ }
+
+ /**
+ * Add key/value to file. Keys must be added in an order that agrees with the
+ * Comparator passed on construction.
+ *
+ * @param cell
+ * Cell to add. Cannot be empty nor null.
+ * @throws IOException
+ */
+ @Override
+ public void append(final Cell cell) throws IOException {
+ // Currently get the complete arrays
+ super.append(cell);
+ int tagsLength = cell.getTagsLength();
+ if (tagsLength > this.maxTagsLength) {
+ this.maxTagsLength = tagsLength;
+ }
+ }
+
+ protected void finishFileInfo() throws IOException {
+ super.finishFileInfo();
+ if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) {
+ // In case of Prefix Tree encoding, we always write tags information into HFiles even if all
+ // KVs are having no tags.
+ fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
+ } else if (hFileContext.isIncludesTags()) {
+ // When tags are not being written in this file, MAX_TAGS_LEN is excluded
+ // from the FileInfo
+ fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
+ boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
+ && hFileContext.isCompressTags();
+ fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
+ }
+ }
+
+ @Override
+ protected int getMajorVersion() {
+ return 3;
+ }
+
+ @Override
+ protected int getMinorVersion() {
+ return HFileReaderV3.MAX_MINOR_VERSION;
+ }
+
+ @Override
+ protected void finishClose(FixedFileTrailer trailer) throws IOException {
+ // Write out encryption metadata before finalizing if we have a valid crypto context
+ Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
+ if (cryptoContext != Encryption.Context.NONE) {
+ // Wrap the context's key and write it as the encryption metadata, the wrapper includes
+ // all information needed for decryption
+ trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(),
+ cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
+ User.getCurrent().getShortName()),
+ cryptoContext.getKey()));
+ }
+ // Now we can finish the close
+ super.finishClose(trailer);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 26ae0970e2c..f2d5c6f5102 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -129,7 +129,7 @@ public class HFileOutputFormat2
// Invented config. Add to hbase-*.xml if other than default compression.
final String defaultCompressionStr = conf.get("hfile.compression",
Compression.Algorithm.NONE.getName());
- final Algorithm defaultCompression = HFileWriterImpl
+ final Algorithm defaultCompression = AbstractHFileWriter
.compressionByName(defaultCompressionStr);
final boolean compactionExclude = conf.getBoolean(
"hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
@@ -483,7 +483,7 @@ public class HFileOutputFormat2
Map compressionMap = new TreeMap(Bytes.BYTES_COMPARATOR);
for (Map.Entry e : stringMap.entrySet()) {
- Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
+ Algorithm algorithm = AbstractHFileWriter.compressionByName(e.getValue());
compressionMap.put(e.getKey(), algorithm);
}
return compressionMap;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index a515f8eff8b..d4354b0e67c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -93,7 +93,6 @@ import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -494,7 +493,6 @@ public class HRegionServer extends HasThread implements
throws IOException {
this.fsOk = true;
this.conf = conf;
- HFile.checkHFileVersion(this.conf);
checkCodecs(this.conf);
this.userProvider = UserProvider.instantiate(conf);
FSUtils.setupShortCircuitRead(this.conf);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 0bb2391cb7a..891004286f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -136,7 +136,7 @@ public interface Region extends ConfigurationObserver {
*/
long getOldestHfileTs(boolean majorCompactioOnly) throws IOException;
- /**
+ /**
* @return map of column family names to max sequence id that was read from storage when this
* region was opened
*/
@@ -157,7 +157,7 @@ public interface Region extends ConfigurationObserver {
///////////////////////////////////////////////////////////////////////////
// Metrics
-
+
/** @return read requests count for this region */
long getReadRequestsCount();
@@ -181,7 +181,7 @@ public interface Region extends ConfigurationObserver {
/** @return the number of mutations processed bypassing the WAL */
long getNumMutationsWithoutWAL();
-
+
/** @return the size of data processed bypassing the WAL, in bytes */
long getDataInMemoryWithoutWAL();
@@ -216,7 +216,7 @@ public interface Region extends ConfigurationObserver {
/**
* This method needs to be called before any public call that reads or
- * modifies data.
+ * modifies data.
* Acquires a read lock and checks if the region is closing or closed.
*
{@link #closeRegionOperation} MUST then always be called after
* the operation has completed, whether it succeeded or failed.
@@ -226,7 +226,7 @@ public interface Region extends ConfigurationObserver {
/**
* This method needs to be called before any public call that reads or
- * modifies data.
+ * modifies data.
* Acquires a read lock and checks if the region is closing or closed.
*
{@link #closeRegionOperation} MUST then always be called after
* the operation has completed, whether it succeeded or failed.
@@ -413,7 +413,7 @@ public interface Region extends ConfigurationObserver {
/**
* Perform atomic mutations within the region.
- *
+ *
* @param mutations The list of mutations to perform.
* mutations can contain operations for multiple rows.
* Caller has to ensure that all rows are contained in this region.
@@ -588,7 +588,7 @@ public interface Region extends ConfigurationObserver {
byte[] now) throws IOException;
/**
- * Replace any cell timestamps set to {@link org.apache.hadoop.hbase.HConstants#LATEST_TIMESTAMP}
+ * Replace any cell timestamps set to HConstants#LATEST_TIMESTAMP with the
* provided current timestamp.
* @param values
* @param now
@@ -609,13 +609,13 @@ public interface Region extends ConfigurationObserver {
CANNOT_FLUSH_MEMSTORE_EMPTY,
CANNOT_FLUSH
}
-
+
/** @return the detailed result code */
Result getResult();
/** @return true if the memstores were flushed, else false */
boolean isFlushSucceeded();
-
+
/** @return True if the flush requested a compaction, else false */
boolean isCompactionNeeded();
}
@@ -647,7 +647,7 @@ public interface Region extends ConfigurationObserver {
* Synchronously compact all stores in the region.
*
This operation could block for a long time, so don't call it from a
* time-sensitive thread.
- *
Note that no locks are taken to prevent possible conflicts between
+ *
Note that no locks are taken to prevent possible conflicts between
* compaction and splitting activities. The regionserver does not normally compact
* and split in parallel. However by calling this method you may introduce
* unexpected and unhandled concurrency. Don't do this unless you know what
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 345dd9b36ce..c1a6b76e3e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
@@ -408,7 +409,7 @@ public class StoreFile {
}
this.reader.setSequenceID(this.sequenceid);
- b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
+ b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
if (b != null) {
this.maxMemstoreTS = Bytes.toLong(b);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index ae820b55d82..3c3ea6b1d9a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
@@ -142,7 +142,7 @@ public abstract class Compactor {
fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
}
else {
- tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
+ tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
if (tmp != null) {
fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
index a9cc1c65ed4..cdef12f92e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -120,7 +120,7 @@ public class CompressionTest {
throws Exception {
Configuration conf = HBaseConfiguration.create();
HFileContext context = new HFileContextBuilder()
- .withCompression(HFileWriterImpl.compressionByName(codec)).build();
+ .withCompression(AbstractHFileWriter.compressionByName(codec)).build();
HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
.withPath(fs, path)
.withFileContext(context)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
index cb12bea1e2d..ea10f602151 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.io.crypto.aes.AES;
-import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -326,7 +326,7 @@ public class HFilePerformanceEvaluation {
void setUp() throws Exception {
HFileContextBuilder builder = new HFileContextBuilder()
- .withCompression(HFileWriterImpl.compressionByName(codec))
+ .withCompression(AbstractHFileWriter.compressionByName(codec))
.withBlockSize(RFILE_BLOCKSIZE);
if (cipher == "aes") {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index 0622f55f008..00639cfc692 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -242,6 +242,7 @@ public class TestCacheOnWrite {
public void setUp() throws IOException {
conf = TEST_UTIL.getConfiguration();
this.conf.set("dfs.datanode.data.dir.perm", "700");
+ conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
BLOOM_BLOCK_SIZE);
@@ -271,7 +272,12 @@ public class TestCacheOnWrite {
}
private void readStoreFile(boolean useTags) throws IOException {
- HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, conf);
+ AbstractHFileReader reader;
+ if (useTags) {
+ reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf);
+ } else {
+ reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf);
+ }
LOG.info("HFile information: " + reader);
HFileContext meta = new HFileContextBuilder().withCompression(compress)
.withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
@@ -372,6 +378,11 @@ public class TestCacheOnWrite {
}
private void writeStoreFile(boolean useTags) throws IOException {
+ if(useTags) {
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+ } else {
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
+ }
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
"test_cache_on_write");
HFileContext meta = new HFileContextBuilder().withCompression(compress)
@@ -411,6 +422,11 @@ public class TestCacheOnWrite {
private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
throws IOException, InterruptedException {
+ if (useTags) {
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+ } else {
+ TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
+ }
// TODO: need to change this test if we add a cache size threshold for
// compactions, or if we implement some other kind of intelligent logic for
// deciding what blocks to cache-on-write on compaction.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java
index cb56c7862d4..1b6731a5d5f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java
@@ -90,7 +90,7 @@ public class TestFixedFileTrailer {
@Test
public void testTrailer() throws IOException {
FixedFileTrailer t = new FixedFileTrailer(version,
- HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION);
+ HFileReaderV2.PBUF_TRAILER_MINOR_VERSION);
t.setDataIndexCount(3);
t.setEntryCount(((long) Integer.MAX_VALUE) + 1);
@@ -123,7 +123,7 @@ public class TestFixedFileTrailer {
{
DataInputStream dis = new DataInputStream(bais);
FixedFileTrailer t2 = new FixedFileTrailer(version,
- HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION);
+ HFileReaderV2.PBUF_TRAILER_MINOR_VERSION);
t2.deserialize(dis);
assertEquals(-1, bais.read()); // Ensure we have read everything.
checkLoadedTrailer(version, t, t2);
@@ -172,7 +172,7 @@ public class TestFixedFileTrailer {
public void testTrailerForV2NonPBCompatibility() throws Exception {
if (version == 2) {
FixedFileTrailer t = new FixedFileTrailer(version,
- HFileReaderImpl.MINOR_VERSION_NO_CHECKSUM);
+ HFileReaderV2.MINOR_VERSION_NO_CHECKSUM);
t.setDataIndexCount(3);
t.setEntryCount(((long) Integer.MAX_VALUE) + 1);
t.setLastDataBlockOffset(291);
@@ -199,7 +199,7 @@ public class TestFixedFileTrailer {
{
DataInputStream dis = new DataInputStream(bais);
FixedFileTrailer t2 = new FixedFileTrailer(version,
- HFileReaderImpl.MINOR_VERSION_NO_CHECKSUM);
+ HFileReaderV2.MINOR_VERSION_NO_CHECKSUM);
t2.deserialize(dis);
assertEquals(-1, bais.read()); // Ensure we have read everything.
checkLoadedTrailer(version, t, t2);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
index cf2aca57eaa..762584267ff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
@@ -82,6 +82,8 @@ public class TestForceCacheImportantBlocks {
public static Collection