Revert "HBASE-Squash HFileReaderV3 together with HFileReaderV2 and AbstractHFileReader; ditto for Scanners and BlockReader, etc."
Revert because missing JIRA number
This reverts commit 691efc60f7
.
This commit is contained in:
parent
691efc60f7
commit
da61928246
|
@ -24,9 +24,9 @@ import org.apache.hadoop.hbase.Waiter.Predicate;
|
|||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileReaderV3;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterV3;
|
||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Reader;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
|
||||
|
@ -46,8 +46,8 @@ public class IntegrationTestIngestWithEncryption extends IntegrationTestIngest {
|
|||
static {
|
||||
// These log level changes are only useful when running on a localhost
|
||||
// cluster.
|
||||
Logger.getLogger(HFileReaderImpl.class).setLevel(Level.TRACE);
|
||||
Logger.getLogger(HFileWriterImpl.class).setLevel(Level.TRACE);
|
||||
Logger.getLogger(HFileReaderV3.class).setLevel(Level.TRACE);
|
||||
Logger.getLogger(HFileWriterV3.class).setLevel(Level.TRACE);
|
||||
Logger.getLogger(SecureProtobufLogReader.class).setLevel(Level.TRACE);
|
||||
Logger.getLogger(SecureProtobufLogWriter.class).setLevel(Level.TRACE);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,352 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
|
||||
/**
|
||||
* Common functionality needed by all versions of {@link HFile} readers.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
|
||||
public abstract class AbstractHFileReader
|
||||
implements HFile.Reader, Configurable {
|
||||
/** Stream to read from. Does checksum verifications in file system */
|
||||
protected FSDataInputStream istream; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD
|
||||
|
||||
/** The file system stream of the underlying {@link HFile} that
|
||||
* does not do checksum verification in the file system */
|
||||
protected FSDataInputStream istreamNoFsChecksum; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD
|
||||
|
||||
/** Data block index reader keeping the root data index in memory */
|
||||
protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
|
||||
|
||||
/** Meta block index reader -- always single level */
|
||||
protected HFileBlockIndex.BlockIndexReader metaBlockIndexReader;
|
||||
|
||||
protected final FixedFileTrailer trailer;
|
||||
|
||||
/** Filled when we read in the trailer. */
|
||||
protected final Compression.Algorithm compressAlgo;
|
||||
|
||||
/**
|
||||
* What kind of data block encoding should be used while reading, writing,
|
||||
* and handling cache.
|
||||
*/
|
||||
protected HFileDataBlockEncoder dataBlockEncoder =
|
||||
NoOpDataBlockEncoder.INSTANCE;
|
||||
|
||||
/** Last key in the file. Filled in when we read in the file info */
|
||||
protected byte [] lastKey = null;
|
||||
|
||||
/** Average key length read from file info */
|
||||
protected int avgKeyLen = -1;
|
||||
|
||||
/** Average value length read from file info */
|
||||
protected int avgValueLen = -1;
|
||||
|
||||
/** Key comparator */
|
||||
protected KVComparator comparator = new KVComparator();
|
||||
|
||||
/** Size of this file. */
|
||||
protected final long fileSize;
|
||||
|
||||
/** Block cache configuration. */
|
||||
protected final CacheConfig cacheConf;
|
||||
|
||||
/** Path of file */
|
||||
protected final Path path;
|
||||
|
||||
/** File name to be used for block names */
|
||||
protected final String name;
|
||||
|
||||
protected FileInfo fileInfo;
|
||||
|
||||
/** The filesystem used for accesing data */
|
||||
protected HFileSystem hfs;
|
||||
|
||||
protected Configuration conf;
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
|
||||
protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
|
||||
final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs,
|
||||
final Configuration conf) {
|
||||
this.trailer = trailer;
|
||||
this.compressAlgo = trailer.getCompressionCodec();
|
||||
this.cacheConf = cacheConf;
|
||||
this.fileSize = fileSize;
|
||||
this.path = path;
|
||||
this.name = path.getName();
|
||||
this.hfs = hfs; // URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
public static class BlockIndexNotLoadedException
|
||||
extends IllegalStateException {
|
||||
public BlockIndexNotLoadedException() {
|
||||
// Add a message in case anyone relies on it as opposed to class name.
|
||||
super("Block index not loaded");
|
||||
}
|
||||
}
|
||||
|
||||
protected String toStringFirstKey() {
|
||||
return KeyValue.keyToString(getFirstKey());
|
||||
}
|
||||
|
||||
protected String toStringLastKey() {
|
||||
return KeyValue.keyToString(getLastKey());
|
||||
}
|
||||
|
||||
public abstract boolean isFileInfoLoaded();
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "reader=" + path.toString() +
|
||||
(!isFileInfoLoaded()? "":
|
||||
", compression=" + compressAlgo.getName() +
|
||||
", cacheConf=" + cacheConf +
|
||||
", firstKey=" + toStringFirstKey() +
|
||||
", lastKey=" + toStringLastKey()) +
|
||||
", avgKeyLen=" + avgKeyLen +
|
||||
", avgValueLen=" + avgValueLen +
|
||||
", entries=" + trailer.getEntryCount() +
|
||||
", length=" + fileSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long length() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Scanner on this file. No seeks or reads are done on creation. Call
|
||||
* {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
|
||||
* nothing to clean up in a Scanner. Letting go of your references to the
|
||||
* scanner is sufficient. NOTE: Do not use this overload of getScanner for
|
||||
* compactions.
|
||||
*
|
||||
* @param cacheBlocks True if we should cache blocks read in by this scanner.
|
||||
* @param pread Use positional read rather than seek+read if true (pread is
|
||||
* better for random reads, seek+read is better scanning).
|
||||
* @return Scanner on this file.
|
||||
*/
|
||||
@Override
|
||||
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
|
||||
return getScanner(cacheBlocks, pread, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the first key in the file. May be null if file has no entries. Note
|
||||
* that this is not the first row key, but rather the byte form of the
|
||||
* first KeyValue.
|
||||
*/
|
||||
@Override
|
||||
public byte [] getFirstKey() {
|
||||
if (dataBlockIndexReader == null) {
|
||||
throw new BlockIndexNotLoadedException();
|
||||
}
|
||||
return dataBlockIndexReader.isEmpty() ? null
|
||||
: dataBlockIndexReader.getRootBlockKey(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO left from {@link HFile} version 1: move this to StoreFile after Ryan's
|
||||
* patch goes in to eliminate {@link KeyValue} here.
|
||||
*
|
||||
* @return the first row key, or null if the file is empty.
|
||||
*/
|
||||
@Override
|
||||
public byte[] getFirstRowKey() {
|
||||
byte[] firstKey = getFirstKey();
|
||||
if (firstKey == null)
|
||||
return null;
|
||||
return KeyValue.createKeyValueFromKey(firstKey).getRow();
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO left from {@link HFile} version 1: move this to StoreFile after
|
||||
* Ryan's patch goes in to eliminate {@link KeyValue} here.
|
||||
*
|
||||
* @return the last row key, or null if the file is empty.
|
||||
*/
|
||||
@Override
|
||||
public byte[] getLastRowKey() {
|
||||
byte[] lastKey = getLastKey();
|
||||
if (lastKey == null)
|
||||
return null;
|
||||
return KeyValue.createKeyValueFromKey(lastKey).getRow();
|
||||
}
|
||||
|
||||
/** @return number of KV entries in this HFile */
|
||||
@Override
|
||||
public long getEntries() {
|
||||
return trailer.getEntryCount();
|
||||
}
|
||||
|
||||
/** @return comparator */
|
||||
@Override
|
||||
public KVComparator getComparator() {
|
||||
return comparator;
|
||||
}
|
||||
|
||||
/** @return compression algorithm */
|
||||
@Override
|
||||
public Compression.Algorithm getCompressionAlgorithm() {
|
||||
return compressAlgo;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the total heap size of data and meta block indexes in bytes. Does
|
||||
* not take into account non-root blocks of a multilevel data index.
|
||||
*/
|
||||
public long indexSize() {
|
||||
return (dataBlockIndexReader != null ? dataBlockIndexReader.heapSize() : 0)
|
||||
+ ((metaBlockIndexReader != null) ? metaBlockIndexReader.heapSize()
|
||||
: 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlockIndex.BlockIndexReader getDataBlockIndexReader() {
|
||||
return dataBlockIndexReader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FixedFileTrailer getTrailer() {
|
||||
return trailer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileInfo loadFileInfo() throws IOException {
|
||||
return fileInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* An exception thrown when an operation requiring a scanner to be seeked
|
||||
* is invoked on a scanner that is not seeked.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public static class NotSeekedException extends IllegalStateException {
|
||||
public NotSeekedException() {
|
||||
super("Not seeked to a key/value");
|
||||
}
|
||||
}
|
||||
|
||||
protected static abstract class Scanner implements HFileScanner {
|
||||
protected ByteBuffer blockBuffer;
|
||||
|
||||
protected boolean cacheBlocks;
|
||||
protected final boolean pread;
|
||||
protected final boolean isCompaction;
|
||||
|
||||
protected int currKeyLen;
|
||||
protected int currValueLen;
|
||||
protected int currMemstoreTSLen;
|
||||
protected long currMemstoreTS;
|
||||
|
||||
protected int blockFetches;
|
||||
|
||||
protected final HFile.Reader reader;
|
||||
|
||||
public Scanner(final HFile.Reader reader, final boolean cacheBlocks,
|
||||
final boolean pread, final boolean isCompaction) {
|
||||
this.reader = reader;
|
||||
this.cacheBlocks = cacheBlocks;
|
||||
this.pread = pread;
|
||||
this.isCompaction = isCompaction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSeeked(){
|
||||
return blockBuffer != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HFileScanner for reader " + String.valueOf(getReader());
|
||||
}
|
||||
|
||||
protected void assertSeeked() {
|
||||
if (!isSeeked())
|
||||
throw new NotSeekedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int seekTo(byte[] key) throws IOException {
|
||||
return seekTo(key, 0, key.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean seekBefore(byte[] key) throws IOException {
|
||||
return seekBefore(key, 0, key.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int reseekTo(byte[] key) throws IOException {
|
||||
return reseekTo(key, 0, key.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFile.Reader getReader() {
|
||||
return reader;
|
||||
}
|
||||
}
|
||||
|
||||
/** For testing */
|
||||
abstract HFileBlock.FSReader getUncachedBlockReader();
|
||||
|
||||
public Path getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataBlockEncoding getDataBlockEncoding() {
|
||||
return dataBlockEncoder.getDataBlockEncoding();
|
||||
}
|
||||
|
||||
public abstract int getMajorVersion();
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,266 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* Common functionality needed by all versions of {@link HFile} writers.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class AbstractHFileWriter implements HFile.Writer {
|
||||
|
||||
/** The Cell previously appended. Becomes the last cell in the file.*/
|
||||
protected Cell lastCell = null;
|
||||
|
||||
/** FileSystem stream to write into. */
|
||||
protected FSDataOutputStream outputStream;
|
||||
|
||||
/** True if we opened the <code>outputStream</code> (and so will close it). */
|
||||
protected final boolean closeOutputStream;
|
||||
|
||||
/** A "file info" block: a key-value map of file-wide metadata. */
|
||||
protected FileInfo fileInfo = new HFile.FileInfo();
|
||||
|
||||
/** Total # of key/value entries, i.e. how many times add() was called. */
|
||||
protected long entryCount = 0;
|
||||
|
||||
/** Used for calculating the average key length. */
|
||||
protected long totalKeyLength = 0;
|
||||
|
||||
/** Used for calculating the average value length. */
|
||||
protected long totalValueLength = 0;
|
||||
|
||||
/** Total uncompressed bytes, maybe calculate a compression ratio later. */
|
||||
protected long totalUncompressedBytes = 0;
|
||||
|
||||
/** Key comparator. Used to ensure we write in order. */
|
||||
protected final KVComparator comparator;
|
||||
|
||||
/** Meta block names. */
|
||||
protected List<byte[]> metaNames = new ArrayList<byte[]>();
|
||||
|
||||
/** {@link Writable}s representing meta block data. */
|
||||
protected List<Writable> metaData = new ArrayList<Writable>();
|
||||
|
||||
/**
|
||||
* First cell in a block.
|
||||
* This reference should be short-lived since we write hfiles in a burst.
|
||||
*/
|
||||
protected Cell firstCellInBlock = null;
|
||||
|
||||
/** May be null if we were passed a stream. */
|
||||
protected final Path path;
|
||||
|
||||
|
||||
/** Cache configuration for caching data on write. */
|
||||
protected final CacheConfig cacheConf;
|
||||
|
||||
/**
|
||||
* Name for this object used when logging or in toString. Is either
|
||||
* the result of a toString on stream or else name of passed file Path.
|
||||
*/
|
||||
protected final String name;
|
||||
|
||||
/**
|
||||
* The data block encoding which will be used.
|
||||
* {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
|
||||
*/
|
||||
protected final HFileDataBlockEncoder blockEncoder;
|
||||
|
||||
protected final HFileContext hFileContext;
|
||||
|
||||
public AbstractHFileWriter(CacheConfig cacheConf,
|
||||
FSDataOutputStream outputStream, Path path,
|
||||
KVComparator comparator, HFileContext fileContext) {
|
||||
this.outputStream = outputStream;
|
||||
this.path = path;
|
||||
this.name = path != null ? path.getName() : outputStream.toString();
|
||||
this.hFileContext = fileContext;
|
||||
DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
|
||||
if (encoding != DataBlockEncoding.NONE) {
|
||||
this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
|
||||
} else {
|
||||
this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
|
||||
}
|
||||
this.comparator = comparator != null ? comparator
|
||||
: KeyValue.COMPARATOR;
|
||||
|
||||
closeOutputStream = path != null;
|
||||
this.cacheConf = cacheConf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add last bits of metadata to file info before it is written out.
|
||||
*/
|
||||
protected void finishFileInfo() throws IOException {
|
||||
if (lastCell != null) {
|
||||
// Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
|
||||
// byte buffer. Won't take a tuple.
|
||||
byte [] lastKey = CellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
|
||||
fileInfo.append(FileInfo.LASTKEY, lastKey, false);
|
||||
}
|
||||
|
||||
// Average key length.
|
||||
int avgKeyLen =
|
||||
entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
|
||||
fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
|
||||
|
||||
// Average value length.
|
||||
int avgValueLen =
|
||||
entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
|
||||
fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
|
||||
|
||||
fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
|
||||
false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add to the file info. All added key/value pairs can be obtained using
|
||||
* {@link HFile.Reader#loadFileInfo()}.
|
||||
*
|
||||
* @param k Key
|
||||
* @param v Value
|
||||
* @throws IOException in case the key or the value are invalid
|
||||
*/
|
||||
@Override
|
||||
public void appendFileInfo(final byte[] k, final byte[] v)
|
||||
throws IOException {
|
||||
fileInfo.append(k, v, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the file info offset in the trailer, finishes up populating fields in
|
||||
* the file info, and writes the file info into the given data output. The
|
||||
* reason the data output is not always {@link #outputStream} is that we store
|
||||
* file info as a block in version 2.
|
||||
*
|
||||
* @param trailer fixed file trailer
|
||||
* @param out the data output to write the file info to
|
||||
* @throws IOException
|
||||
*/
|
||||
protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
|
||||
throws IOException {
|
||||
trailer.setFileInfoOffset(outputStream.getPos());
|
||||
finishFileInfo();
|
||||
fileInfo.write(out);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that the given Cell's key does not violate the key order.
|
||||
*
|
||||
* @param cell Cell whose key to check.
|
||||
* @return true if the key is duplicate
|
||||
* @throws IOException if the key or the key order is wrong
|
||||
*/
|
||||
protected boolean checkKey(final Cell cell) throws IOException {
|
||||
boolean isDuplicateKey = false;
|
||||
|
||||
if (cell == null) {
|
||||
throw new IOException("Key cannot be null or empty");
|
||||
}
|
||||
if (lastCell != null) {
|
||||
int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell);
|
||||
|
||||
if (keyComp > 0) {
|
||||
throw new IOException("Added a key not lexically larger than"
|
||||
+ " previous. Current cell = " + cell + ", lastCell = " + lastCell);
|
||||
} else if (keyComp == 0) {
|
||||
isDuplicateKey = true;
|
||||
}
|
||||
}
|
||||
return isDuplicateKey;
|
||||
}
|
||||
|
||||
/** Checks the given value for validity. */
|
||||
protected void checkValue(final byte[] value, final int offset,
|
||||
final int length) throws IOException {
|
||||
if (value == null) {
|
||||
throw new IOException("Value cannot be null");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Path or null if we were passed a stream rather than a Path.
|
||||
*/
|
||||
@Override
|
||||
public Path getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "writer=" + (path != null ? path.toString() : null) + ", name="
|
||||
+ name + ", compression=" + hFileContext.getCompression().getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets remaining trailer fields, writes the trailer to disk, and optionally
|
||||
* closes the output stream.
|
||||
*/
|
||||
protected void finishClose(FixedFileTrailer trailer) throws IOException {
|
||||
trailer.setMetaIndexCount(metaNames.size());
|
||||
trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
|
||||
trailer.setEntryCount(entryCount);
|
||||
trailer.setCompressionCodec(hFileContext.getCompression());
|
||||
|
||||
trailer.serialize(outputStream);
|
||||
|
||||
if (closeOutputStream) {
|
||||
outputStream.close();
|
||||
outputStream = null;
|
||||
}
|
||||
}
|
||||
|
||||
public static Compression.Algorithm compressionByName(String algoName) {
|
||||
if (algoName == null)
|
||||
return HFile.DEFAULT_COMPRESSION_ALGORITHM;
|
||||
return Compression.getCompressionAlgorithmByName(algoName);
|
||||
}
|
||||
|
||||
/** A helper method to create HFile output streams in constructors */
|
||||
protected static FSDataOutputStream createOutputStream(Configuration conf,
|
||||
FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
|
||||
FsPermission perms = FSUtils.getFilePermissions(fs, conf,
|
||||
HConstants.DATA_FILE_UMASK_KEY);
|
||||
return FSUtils.create(fs, path, perms, favoredNodes);
|
||||
}
|
||||
}
|
|
@ -238,7 +238,7 @@ public class FixedFileTrailer {
|
|||
BlockType.TRAILER.readAndCheck(inputStream);
|
||||
|
||||
if (majorVersion > 2
|
||||
|| (majorVersion == 2 && minorVersion >= HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION)) {
|
||||
|| (majorVersion == 2 && minorVersion >= HFileReaderV2.PBUF_TRAILER_MINOR_VERSION)) {
|
||||
deserializeFromPB(inputStream);
|
||||
} else {
|
||||
deserializeFromWritable(inputStream);
|
||||
|
@ -611,9 +611,7 @@ public class FixedFileTrailer {
|
|||
}
|
||||
|
||||
public byte[] getEncryptionKey() {
|
||||
// This is a v3 feature but if reading a v2 file the encryptionKey will just be null which
|
||||
// if fine for this feature.
|
||||
expectAtLeastMajorVersion(2);
|
||||
expectAtLeastMajorVersion(3);
|
||||
return encryptionKey;
|
||||
}
|
||||
|
||||
|
|
|
@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
|
|||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
|
@ -197,8 +196,6 @@ public class HFile {
|
|||
|
||||
/** API required to write an {@link HFile} */
|
||||
public interface Writer extends Closeable {
|
||||
/** Max memstore (mvcc) timestamp in FileInfo */
|
||||
public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
|
||||
|
||||
/** Add an element to the file info map. */
|
||||
void appendFileInfo(byte[] key, byte[] value) throws IOException;
|
||||
|
@ -296,7 +293,7 @@ public class HFile {
|
|||
"filesystem/path or path");
|
||||
}
|
||||
if (path != null) {
|
||||
ostream = HFileWriterImpl.createOutputStream(conf, fs, path, favoredNodes);
|
||||
ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
|
||||
}
|
||||
return createWriter(fs, path, ostream,
|
||||
comparator, fileContext);
|
||||
|
@ -335,12 +332,9 @@ public class HFile {
|
|||
int version = getFormatVersion(conf);
|
||||
switch (version) {
|
||||
case 2:
|
||||
throw new IllegalArgumentException("This should never happen. " +
|
||||
"Did you change hfile.format.version to read v2? This version of the software writes v3" +
|
||||
" hfiles only (but it can read v2 files without having to update hfile.format.version " +
|
||||
"in hbase-site.xml)");
|
||||
return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
|
||||
case 3:
|
||||
return new HFileWriterFactory(conf, cacheConf);
|
||||
return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
|
||||
default:
|
||||
throw new IllegalArgumentException("Cannot create writer for HFile " +
|
||||
"format version " + version);
|
||||
|
@ -445,18 +439,6 @@ public class HFile {
|
|||
* Return the file context of the HFile this reader belongs to
|
||||
*/
|
||||
HFileContext getFileContext();
|
||||
|
||||
boolean shouldIncludeMemstoreTS();
|
||||
|
||||
boolean isDecodeMemstoreTS();
|
||||
|
||||
DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction);
|
||||
|
||||
@VisibleForTesting
|
||||
HFileBlock.FSReader getUncachedBlockReader();
|
||||
|
||||
@VisibleForTesting
|
||||
boolean prefetchComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -480,10 +462,9 @@ public class HFile {
|
|||
trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
|
||||
switch (trailer.getMajorVersion()) {
|
||||
case 2:
|
||||
LOG.debug("Opening HFile v2 with v3 reader");
|
||||
// Fall through.
|
||||
return new HFileReaderV2(path, trailer, fsdis, size, cacheConf, hfs, conf);
|
||||
case 3 :
|
||||
return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, conf);
|
||||
return new HFileReaderV3(path, trailer, fsdis, size, cacheConf, hfs, conf);
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
|
||||
}
|
||||
|
@ -507,7 +488,6 @@ public class HFile {
|
|||
* @return A version specific Hfile Reader
|
||||
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
|
||||
*/
|
||||
@SuppressWarnings("resource")
|
||||
public static Reader createReader(FileSystem fs, Path path,
|
||||
FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf)
|
||||
throws IOException {
|
||||
|
@ -873,18 +853,6 @@ public class HFile {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public static void checkHFileVersion(final Configuration c) {
|
||||
int version = c.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
|
||||
if (version < MAX_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
|
||||
throw new IllegalArgumentException("The setting for " + FORMAT_VERSION_KEY +
|
||||
" (in your hbase-*.xml files) is " + version + " which does not match " +
|
||||
MAX_FORMAT_VERSION +
|
||||
"; are you running with a configuration from an older or newer hbase install (an " +
|
||||
"incompatible hbase-default.xml or hbase-site.xml on your CLASSPATH)?");
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// delegate to preserve old behavior
|
||||
HFilePrettyPrinter.main(args);
|
||||
|
|
|
@ -1257,40 +1257,13 @@ public class HFileBlock implements Cacheable {
|
|||
|
||||
/** Get the default decoder for blocks from this file. */
|
||||
HFileBlockDecodingContext getDefaultBlockDecodingContext();
|
||||
|
||||
void setIncludesMemstoreTS(boolean includesMemstoreTS);
|
||||
void setDataBlockEncoder(HFileDataBlockEncoder encoder);
|
||||
}
|
||||
|
||||
/**
|
||||
* We always prefetch the header of the next block, so that we know its
|
||||
* on-disk size in advance and can read it in one operation.
|
||||
* A common implementation of some methods of {@link FSReader} and some
|
||||
* tools for implementing HFile format version-specific block readers.
|
||||
*/
|
||||
private static class PrefetchedHeader {
|
||||
long offset = -1;
|
||||
byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
|
||||
final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
|
||||
}
|
||||
|
||||
/** Reads version 2 blocks from the filesystem. */
|
||||
static class FSReaderImpl implements FSReader {
|
||||
/** The file system stream of the underlying {@link HFile} that
|
||||
* does or doesn't do checksum validations in the filesystem */
|
||||
protected FSDataInputStreamWrapper streamWrapper;
|
||||
|
||||
private HFileBlockDecodingContext encodedBlockDecodingCtx;
|
||||
|
||||
/** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
|
||||
private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
|
||||
|
||||
private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
|
||||
new ThreadLocal<PrefetchedHeader>() {
|
||||
@Override
|
||||
public PrefetchedHeader initialValue() {
|
||||
return new PrefetchedHeader();
|
||||
}
|
||||
};
|
||||
|
||||
private abstract static class AbstractFSReader implements FSReader {
|
||||
/** Compression algorithm used by the {@link HFile} */
|
||||
|
||||
/** The size of the file we are reading from, or -1 if unknown. */
|
||||
|
@ -1312,31 +1285,18 @@ public class HFileBlock implements Cacheable {
|
|||
|
||||
protected HFileContext fileContext;
|
||||
|
||||
public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
|
||||
HFileContext fileContext) throws IOException {
|
||||
public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
|
||||
throws IOException {
|
||||
this.fileSize = fileSize;
|
||||
this.hfs = hfs;
|
||||
this.path = path;
|
||||
this.fileContext = fileContext;
|
||||
this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
|
||||
|
||||
this.streamWrapper = stream;
|
||||
// Older versions of HBase didn't support checksum.
|
||||
this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
|
||||
defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
|
||||
encodedBlockDecodingCtx = defaultDecodingCtx;
|
||||
}
|
||||
|
||||
/**
|
||||
* A constructor that reads files with the latest minor version.
|
||||
* This is used by unit tests only.
|
||||
*/
|
||||
FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
|
||||
throws IOException {
|
||||
this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
|
||||
}
|
||||
|
||||
public BlockIterator blockRange(final long startOffset, final long endOffset) {
|
||||
@Override
|
||||
public BlockIterator blockRange(final long startOffset,
|
||||
final long endOffset) {
|
||||
final FSReader owner = this; // handle for inner class
|
||||
return new BlockIterator() {
|
||||
private long offset = startOffset;
|
||||
|
@ -1433,6 +1393,56 @@ public class HFileBlock implements Cacheable {
|
|||
return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* We always prefetch the header of the next block, so that we know its
|
||||
* on-disk size in advance and can read it in one operation.
|
||||
*/
|
||||
private static class PrefetchedHeader {
|
||||
long offset = -1;
|
||||
byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
|
||||
final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
|
||||
}
|
||||
|
||||
/** Reads version 2 blocks from the filesystem. */
|
||||
static class FSReaderImpl extends AbstractFSReader {
|
||||
/** The file system stream of the underlying {@link HFile} that
|
||||
* does or doesn't do checksum validations in the filesystem */
|
||||
protected FSDataInputStreamWrapper streamWrapper;
|
||||
|
||||
private HFileBlockDecodingContext encodedBlockDecodingCtx;
|
||||
|
||||
/** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
|
||||
private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
|
||||
|
||||
private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
|
||||
new ThreadLocal<PrefetchedHeader>() {
|
||||
@Override
|
||||
public PrefetchedHeader initialValue() {
|
||||
return new PrefetchedHeader();
|
||||
}
|
||||
};
|
||||
|
||||
public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
|
||||
HFileContext fileContext) throws IOException {
|
||||
super(fileSize, hfs, path, fileContext);
|
||||
this.streamWrapper = stream;
|
||||
// Older versions of HBase didn't support checksum.
|
||||
this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
|
||||
defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
|
||||
encodedBlockDecodingCtx = defaultDecodingCtx;
|
||||
}
|
||||
|
||||
/**
|
||||
* A constructor that reads files with the latest minor version.
|
||||
* This is used by unit tests only.
|
||||
*/
|
||||
FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
|
||||
throws IOException {
|
||||
this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
|
||||
* little memory allocation as possible, using the provided on-disk size.
|
||||
|
@ -1673,11 +1683,11 @@ public class HFileBlock implements Cacheable {
|
|||
return b;
|
||||
}
|
||||
|
||||
public void setIncludesMemstoreTS(boolean includesMemstoreTS) {
|
||||
void setIncludesMemstoreTS(boolean includesMemstoreTS) {
|
||||
this.fileContext.setIncludesMvcc(includesMemstoreTS);
|
||||
}
|
||||
|
||||
public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
|
||||
void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
|
||||
encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
|
||||
}
|
||||
|
||||
|
|
|
@ -54,9 +54,9 @@ import org.apache.hadoop.util.StringUtils;
|
|||
* ({@link BlockIndexReader}) single-level and multi-level block indexes.
|
||||
*
|
||||
* Examples of how to use the block index writer can be found in
|
||||
* {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter} and
|
||||
* {@link HFileWriterImpl}. Examples of how to use the reader can be
|
||||
* found in {@link HFileWriterImpl} and TestHFileBlockIndex.
|
||||
* {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter}
|
||||
* and {@link HFileWriterV2}. Examples of how to use the reader can be
|
||||
* found in {@link HFileReaderV2} and TestHFileBlockIndex.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HFileBlockIndex {
|
||||
|
@ -193,7 +193,7 @@ public class HFileBlockIndex {
|
|||
* Return the BlockWithScanInfo which contains the DataBlock with other scan
|
||||
* info such as nextIndexedKey. This function will only be called when the
|
||||
* HFile version is larger than 1.
|
||||
*
|
||||
*
|
||||
* @param key
|
||||
* the key we are looking for
|
||||
* @param currentBlock
|
||||
|
@ -494,7 +494,7 @@ public class HFileBlockIndex {
|
|||
* Performs a binary search over a non-root level index block. Utilizes the
|
||||
* secondary index, which records the offsets of (offset, onDiskSize,
|
||||
* firstKey) tuples of all entries.
|
||||
*
|
||||
*
|
||||
* @param key
|
||||
* the key we are searching for offsets to individual entries in
|
||||
* the blockIndex buffer
|
||||
|
@ -641,7 +641,7 @@ public class HFileBlockIndex {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Read in the root-level index from the given input stream. Must match
|
||||
* what was written into the root level by
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,358 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.Key;
|
||||
import java.security.KeyException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.crypto.Cipher;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
||||
/**
|
||||
* {@link HFile} reader for version 3.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HFileReaderV3 extends HFileReaderV2 {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HFileReaderV3.class);
|
||||
|
||||
public static final int MAX_MINOR_VERSION = 0;
|
||||
|
||||
/**
|
||||
* Opens a HFile. You must load the index before you can use it by calling
|
||||
* {@link #loadFileInfo()}.
|
||||
* @param path
|
||||
* Path to HFile.
|
||||
* @param trailer
|
||||
* File trailer.
|
||||
* @param fsdis
|
||||
* input stream.
|
||||
* @param size
|
||||
* Length of the stream.
|
||||
* @param cacheConf
|
||||
* Cache configuration.
|
||||
* @param hfs
|
||||
* The file system.
|
||||
* @param conf
|
||||
* Configuration
|
||||
*/
|
||||
public HFileReaderV3(final Path path, FixedFileTrailer trailer, final FSDataInputStreamWrapper fsdis,
|
||||
final long size, final CacheConfig cacheConf, final HFileSystem hfs,
|
||||
final Configuration conf) throws IOException {
|
||||
super(path, trailer, fsdis, size, cacheConf, hfs, conf);
|
||||
byte[] tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
|
||||
// max tag length is not present in the HFile means tags were not at all written to file.
|
||||
if (tmp != null) {
|
||||
hfileContext.setIncludesTags(true);
|
||||
tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED);
|
||||
if (tmp != null && Bytes.toBoolean(tmp)) {
|
||||
hfileContext.setCompressTags(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
|
||||
HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
|
||||
trailer.expectMajorVersion(3);
|
||||
HFileContextBuilder builder = new HFileContextBuilder()
|
||||
.withIncludesMvcc(this.includesMemstoreTS)
|
||||
.withHBaseCheckSum(true)
|
||||
.withCompression(this.compressAlgo);
|
||||
|
||||
// Check for any key material available
|
||||
byte[] keyBytes = trailer.getEncryptionKey();
|
||||
if (keyBytes != null) {
|
||||
Encryption.Context cryptoContext = Encryption.newContext(conf);
|
||||
Key key;
|
||||
String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
|
||||
User.getCurrent().getShortName());
|
||||
try {
|
||||
// First try the master key
|
||||
key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
|
||||
} catch (KeyException e) {
|
||||
// If the current master key fails to unwrap, try the alternate, if
|
||||
// one is configured
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
|
||||
}
|
||||
String alternateKeyName =
|
||||
conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
|
||||
if (alternateKeyName != null) {
|
||||
try {
|
||||
key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
|
||||
} catch (KeyException ex) {
|
||||
throw new IOException(ex);
|
||||
}
|
||||
} else {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
// Use the algorithm the key wants
|
||||
Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
|
||||
if (cipher == null) {
|
||||
throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
|
||||
}
|
||||
cryptoContext.setCipher(cipher);
|
||||
cryptoContext.setKey(key);
|
||||
builder.withEncryptionContext(cryptoContext);
|
||||
}
|
||||
|
||||
HFileContext context = builder.build();
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Reader" + (path != null ? " for " + path : "" ) +
|
||||
" initialized with cacheConf: " + cacheConf +
|
||||
" comparator: " + comparator.getClass().getSimpleName() +
|
||||
" fileContext: " + context);
|
||||
}
|
||||
|
||||
return context;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Scanner on this file. No seeks or reads are done on creation. Call
|
||||
* {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
|
||||
* nothing to clean up in a Scanner. Letting go of your references to the
|
||||
* scanner is sufficient.
|
||||
* @param cacheBlocks
|
||||
* True if we should cache blocks read in by this scanner.
|
||||
* @param pread
|
||||
* Use positional read rather than seek+read if true (pread is better
|
||||
* for random reads, seek+read is better scanning).
|
||||
* @param isCompaction
|
||||
* is scanner being used for a compaction?
|
||||
* @return Scanner on this file.
|
||||
*/
|
||||
@Override
|
||||
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
|
||||
final boolean isCompaction) {
|
||||
if (dataBlockEncoder.useEncodedScanner()) {
|
||||
return new EncodedScannerV3(this, cacheBlocks, pread, isCompaction, this.hfileContext);
|
||||
}
|
||||
return new ScannerV3(this, cacheBlocks, pread, isCompaction);
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of {@link HFileScanner} interface.
|
||||
*/
|
||||
protected static class ScannerV3 extends ScannerV2 {
|
||||
|
||||
private HFileReaderV3 reader;
|
||||
private int currTagsLen;
|
||||
|
||||
public ScannerV3(HFileReaderV3 r, boolean cacheBlocks, final boolean pread,
|
||||
final boolean isCompaction) {
|
||||
super(r, cacheBlocks, pread, isCompaction);
|
||||
this.reader = r;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getCellBufSize() {
|
||||
int kvBufSize = super.getCellBufSize();
|
||||
if (reader.hfileContext.isIncludesTags()) {
|
||||
kvBufSize += Bytes.SIZEOF_SHORT + currTagsLen;
|
||||
}
|
||||
return kvBufSize;
|
||||
}
|
||||
|
||||
protected void setNonSeekedState() {
|
||||
super.setNonSeekedState();
|
||||
currTagsLen = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getNextCellStartPosition() {
|
||||
int nextKvPos = super.getNextCellStartPosition();
|
||||
if (reader.hfileContext.isIncludesTags()) {
|
||||
nextKvPos += Bytes.SIZEOF_SHORT + currTagsLen;
|
||||
}
|
||||
return nextKvPos;
|
||||
}
|
||||
|
||||
protected void readKeyValueLen() {
|
||||
blockBuffer.mark();
|
||||
currKeyLen = blockBuffer.getInt();
|
||||
currValueLen = blockBuffer.getInt();
|
||||
if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit()
|
||||
|| currValueLen > blockBuffer.limit()) {
|
||||
throw new IllegalStateException("Invalid currKeyLen " + currKeyLen + " or currValueLen "
|
||||
+ currValueLen + ". Block offset: "
|
||||
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
|
||||
+ blockBuffer.position() + " (without header).");
|
||||
}
|
||||
ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
|
||||
if (reader.hfileContext.isIncludesTags()) {
|
||||
// Read short as unsigned, high byte first
|
||||
currTagsLen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
|
||||
if (currTagsLen < 0 || currTagsLen > blockBuffer.limit()) {
|
||||
throw new IllegalStateException("Invalid currTagsLen " + currTagsLen + ". Block offset: "
|
||||
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
|
||||
+ blockBuffer.position() + " (without header).");
|
||||
}
|
||||
ByteBufferUtils.skip(blockBuffer, currTagsLen);
|
||||
}
|
||||
readMvccVersion();
|
||||
blockBuffer.reset();
|
||||
}
|
||||
|
||||
/**
|
||||
* Within a loaded block, seek looking for the last key that is smaller than
|
||||
* (or equal to?) the key we are interested in.
|
||||
* A note on the seekBefore: if you have seekBefore = true, AND the first
|
||||
* key in the block = key, then you'll get thrown exceptions. The caller has
|
||||
* to check for that case and load the previous block as appropriate.
|
||||
* @param key
|
||||
* the key to find
|
||||
* @param seekBefore
|
||||
* find the key before the given key in case of exact match.
|
||||
* @return 0 in case of an exact key match, 1 in case of an inexact match,
|
||||
* -2 in case of an inexact match and furthermore, the input key
|
||||
* less than the first key of current block(e.g. using a faked index
|
||||
* key)
|
||||
*/
|
||||
@Override
|
||||
protected int blockSeek(Cell key, boolean seekBefore) {
|
||||
int klen, vlen, tlen = 0;
|
||||
long memstoreTS = 0;
|
||||
int memstoreTSLen = 0;
|
||||
int lastKeyValueSize = -1;
|
||||
KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue();
|
||||
do {
|
||||
blockBuffer.mark();
|
||||
klen = blockBuffer.getInt();
|
||||
vlen = blockBuffer.getInt();
|
||||
if (klen < 0 || vlen < 0 || klen > blockBuffer.limit()
|
||||
|| vlen > blockBuffer.limit()) {
|
||||
throw new IllegalStateException("Invalid klen " + klen + " or vlen "
|
||||
+ vlen + ". Block offset: "
|
||||
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
|
||||
+ blockBuffer.position() + " (without header).");
|
||||
}
|
||||
ByteBufferUtils.skip(blockBuffer, klen + vlen);
|
||||
if (reader.hfileContext.isIncludesTags()) {
|
||||
// Read short as unsigned, high byte first
|
||||
tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
|
||||
if (tlen < 0 || tlen > blockBuffer.limit()) {
|
||||
throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: "
|
||||
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
|
||||
+ blockBuffer.position() + " (without header).");
|
||||
}
|
||||
ByteBufferUtils.skip(blockBuffer, tlen);
|
||||
}
|
||||
if (this.reader.shouldIncludeMemstoreTS()) {
|
||||
if (this.reader.decodeMemstoreTS) {
|
||||
try {
|
||||
memstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
|
||||
+ blockBuffer.position());
|
||||
memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error reading memstore timestamp", e);
|
||||
}
|
||||
} else {
|
||||
memstoreTS = 0;
|
||||
memstoreTSLen = 1;
|
||||
}
|
||||
}
|
||||
blockBuffer.reset();
|
||||
int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + (Bytes.SIZEOF_INT * 2);
|
||||
keyOnlyKv.setKey(blockBuffer.array(), keyOffset, klen);
|
||||
int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlyKv);
|
||||
|
||||
if (comp == 0) {
|
||||
if (seekBefore) {
|
||||
if (lastKeyValueSize < 0) {
|
||||
throw new IllegalStateException("blockSeek with seekBefore "
|
||||
+ "at the first key of the block: key="
|
||||
+ CellUtil.getCellKeyAsString(key)
|
||||
+ ", blockOffset=" + block.getOffset() + ", onDiskSize="
|
||||
+ block.getOnDiskSizeWithHeader());
|
||||
}
|
||||
blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
|
||||
readKeyValueLen();
|
||||
return 1; // non exact match.
|
||||
}
|
||||
currKeyLen = klen;
|
||||
currValueLen = vlen;
|
||||
currTagsLen = tlen;
|
||||
if (this.reader.shouldIncludeMemstoreTS()) {
|
||||
currMemstoreTS = memstoreTS;
|
||||
currMemstoreTSLen = memstoreTSLen;
|
||||
}
|
||||
return 0; // indicate exact match
|
||||
} else if (comp < 0) {
|
||||
if (lastKeyValueSize > 0)
|
||||
blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
|
||||
readKeyValueLen();
|
||||
if (lastKeyValueSize == -1 && blockBuffer.position() == 0) {
|
||||
return HConstants.INDEX_KEY_MAGIC;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
// The size of this key/value tuple, including key/value length fields.
|
||||
lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
|
||||
// include tag length also if tags included with KV
|
||||
if (reader.hfileContext.isIncludesTags()) {
|
||||
lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT;
|
||||
}
|
||||
blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
|
||||
} while (blockBuffer.remaining() > 0);
|
||||
|
||||
// Seek to the last key we successfully read. This will happen if this is
|
||||
// the last key/value pair in the file, in which case the following call
|
||||
// to next() has to return false.
|
||||
blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
|
||||
readKeyValueLen();
|
||||
return 1; // didn't exactly find it.
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ScannerV3 that operates on encoded data blocks.
|
||||
*/
|
||||
protected static class EncodedScannerV3 extends EncodedScannerV2 {
|
||||
public EncodedScannerV3(HFileReaderV3 reader, boolean cacheBlocks, boolean pread,
|
||||
boolean isCompaction, HFileContext context) {
|
||||
super(reader, cacheBlocks, pread, isCompaction, context);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMajorVersion() {
|
||||
return 3;
|
||||
}
|
||||
}
|
|
@ -1,40 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitationsME
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
|
||||
public class HFileWriterFactory extends HFile.WriterFactory {
|
||||
HFileWriterFactory(Configuration conf, CacheConfig cacheConf) {
|
||||
super(conf, cacheConf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFile.Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream,
|
||||
KVComparator comparator, HFileContext context)
|
||||
throws IOException {
|
||||
return new HFileWriterImpl(conf, cacheConf, path, ostream, comparator, context);
|
||||
}
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -21,7 +22,6 @@ package org.apache.hadoop.hbase.io.hfile;
|
|||
import java.io.DataOutput;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -31,105 +31,40 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
|
||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* Common functionality needed by all versions of {@link HFile} writers.
|
||||
* Writes HFile format version 2.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HFileWriterImpl implements HFile.Writer {
|
||||
private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class);
|
||||
public class HFileWriterV2 extends AbstractHFileWriter {
|
||||
static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
|
||||
|
||||
/** The Cell previously appended. Becomes the last cell in the file.*/
|
||||
protected Cell lastCell = null;
|
||||
|
||||
/** FileSystem stream to write into. */
|
||||
protected FSDataOutputStream outputStream;
|
||||
|
||||
/** True if we opened the <code>outputStream</code> (and so will close it). */
|
||||
protected final boolean closeOutputStream;
|
||||
|
||||
/** A "file info" block: a key-value map of file-wide metadata. */
|
||||
protected FileInfo fileInfo = new HFile.FileInfo();
|
||||
|
||||
/** Total # of key/value entries, i.e. how many times add() was called. */
|
||||
protected long entryCount = 0;
|
||||
|
||||
/** Used for calculating the average key length. */
|
||||
protected long totalKeyLength = 0;
|
||||
|
||||
/** Used for calculating the average value length. */
|
||||
protected long totalValueLength = 0;
|
||||
|
||||
/** Total uncompressed bytes, maybe calculate a compression ratio later. */
|
||||
protected long totalUncompressedBytes = 0;
|
||||
|
||||
/** Key comparator. Used to ensure we write in order. */
|
||||
protected final KVComparator comparator;
|
||||
|
||||
/** Meta block names. */
|
||||
protected List<byte[]> metaNames = new ArrayList<byte[]>();
|
||||
|
||||
/** {@link Writable}s representing meta block data. */
|
||||
protected List<Writable> metaData = new ArrayList<Writable>();
|
||||
|
||||
/**
|
||||
* First cell in a block.
|
||||
* This reference should be short-lived since we write hfiles in a burst.
|
||||
*/
|
||||
protected Cell firstCellInBlock = null;
|
||||
|
||||
|
||||
/** May be null if we were passed a stream. */
|
||||
protected final Path path;
|
||||
|
||||
/** Cache configuration for caching data on write. */
|
||||
protected final CacheConfig cacheConf;
|
||||
|
||||
/**
|
||||
* Name for this object used when logging or in toString. Is either
|
||||
* the result of a toString on stream or else name of passed file Path.
|
||||
*/
|
||||
protected final String name;
|
||||
|
||||
/**
|
||||
* The data block encoding which will be used.
|
||||
* {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
|
||||
*/
|
||||
protected final HFileDataBlockEncoder blockEncoder;
|
||||
|
||||
protected final HFileContext hFileContext;
|
||||
|
||||
private int maxTagsLength = 0;
|
||||
/** Max memstore (mvcc) timestamp in FileInfo */
|
||||
public static final byte [] MAX_MEMSTORE_TS_KEY =
|
||||
Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
|
||||
|
||||
/** KeyValue version in FileInfo */
|
||||
public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
|
||||
public static final byte [] KEY_VALUE_VERSION =
|
||||
Bytes.toBytes("KEY_VALUE_VERSION");
|
||||
|
||||
/** Version for KeyValue which includes memstore timestamp */
|
||||
public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
|
||||
|
||||
/** Inline block writers for multi-level block index and compound Blooms. */
|
||||
private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<InlineBlockWriter>();
|
||||
private List<InlineBlockWriter> inlineBlockWriters =
|
||||
new ArrayList<InlineBlockWriter>();
|
||||
|
||||
/** block writer */
|
||||
/** Unified version 2 block writer */
|
||||
protected HFileBlock.Writer fsBlockWriter;
|
||||
|
||||
private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
|
||||
|
@ -148,135 +83,40 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
private Cell lastCellOfPreviousBlock = null;
|
||||
|
||||
/** Additional data items to be written to the "load-on-open" section. */
|
||||
private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<BlockWritable>();
|
||||
private List<BlockWritable> additionalLoadOnOpenData =
|
||||
new ArrayList<BlockWritable>();
|
||||
|
||||
protected long maxMemstoreTS = 0;
|
||||
|
||||
public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
|
||||
FSDataOutputStream outputStream,
|
||||
KVComparator comparator, HFileContext fileContext) {
|
||||
this.outputStream = outputStream;
|
||||
this.path = path;
|
||||
this.name = path != null ? path.getName() : outputStream.toString();
|
||||
this.hFileContext = fileContext;
|
||||
DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
|
||||
if (encoding != DataBlockEncoding.NONE) {
|
||||
this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
|
||||
} else {
|
||||
this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
|
||||
static class WriterFactoryV2 extends HFile.WriterFactory {
|
||||
WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
|
||||
super(conf, cacheConf);
|
||||
}
|
||||
this.comparator = comparator != null ? comparator
|
||||
: KeyValue.COMPARATOR;
|
||||
|
||||
closeOutputStream = path != null;
|
||||
this.cacheConf = cacheConf;
|
||||
finishInit(conf);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Writer" + (path != null ? " for " + path : "") +
|
||||
" initialized with cacheConf: " + cacheConf +
|
||||
" comparator: " + comparator.getClass().getSimpleName() +
|
||||
" fileContext: " + fileContext);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add to the file info. All added key/value pairs can be obtained using
|
||||
* {@link HFile.Reader#loadFileInfo()}.
|
||||
*
|
||||
* @param k Key
|
||||
* @param v Value
|
||||
* @throws IOException in case the key or the value are invalid
|
||||
*/
|
||||
@Override
|
||||
public void appendFileInfo(final byte[] k, final byte[] v)
|
||||
throws IOException {
|
||||
fileInfo.append(k, v, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the file info offset in the trailer, finishes up populating fields in
|
||||
* the file info, and writes the file info into the given data output. The
|
||||
* reason the data output is not always {@link #outputStream} is that we store
|
||||
* file info as a block in version 2.
|
||||
*
|
||||
* @param trailer fixed file trailer
|
||||
* @param out the data output to write the file info to
|
||||
* @throws IOException
|
||||
*/
|
||||
protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
|
||||
throws IOException {
|
||||
trailer.setFileInfoOffset(outputStream.getPos());
|
||||
finishFileInfo();
|
||||
fileInfo.write(out);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that the given Cell's key does not violate the key order.
|
||||
*
|
||||
* @param cell Cell whose key to check.
|
||||
* @return true if the key is duplicate
|
||||
* @throws IOException if the key or the key order is wrong
|
||||
*/
|
||||
protected boolean checkKey(final Cell cell) throws IOException {
|
||||
boolean isDuplicateKey = false;
|
||||
|
||||
if (cell == null) {
|
||||
throw new IOException("Key cannot be null or empty");
|
||||
}
|
||||
if (lastCell != null) {
|
||||
int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell);
|
||||
|
||||
if (keyComp > 0) {
|
||||
throw new IOException("Added a key not lexically larger than"
|
||||
+ " previous. Current cell = " + cell + ", lastCell = " + lastCell);
|
||||
} else if (keyComp == 0) {
|
||||
isDuplicateKey = true;
|
||||
@Override
|
||||
public Writer createWriter(FileSystem fs, Path path,
|
||||
FSDataOutputStream ostream,
|
||||
KVComparator comparator, HFileContext context) throws IOException {
|
||||
context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
|
||||
return new HFileWriterV2(conf, cacheConf, fs, path, ostream,
|
||||
comparator, context);
|
||||
}
|
||||
}
|
||||
return isDuplicateKey;
|
||||
}
|
||||
|
||||
/** Checks the given value for validity. */
|
||||
protected void checkValue(final byte[] value, final int offset,
|
||||
final int length) throws IOException {
|
||||
if (value == null) {
|
||||
throw new IOException("Value cannot be null");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Path or null if we were passed a stream rather than a Path.
|
||||
*/
|
||||
@Override
|
||||
public Path getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "writer=" + (path != null ? path.toString() : null) + ", name="
|
||||
+ name + ", compression=" + hFileContext.getCompression().getName();
|
||||
}
|
||||
|
||||
public static Compression.Algorithm compressionByName(String algoName) {
|
||||
if (algoName == null)
|
||||
return HFile.DEFAULT_COMPRESSION_ALGORITHM;
|
||||
return Compression.getCompressionAlgorithmByName(algoName);
|
||||
}
|
||||
|
||||
/** A helper method to create HFile output streams in constructors */
|
||||
protected static FSDataOutputStream createOutputStream(Configuration conf,
|
||||
FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
|
||||
FsPermission perms = FSUtils.getFilePermissions(fs, conf,
|
||||
HConstants.DATA_FILE_UMASK_KEY);
|
||||
return FSUtils.create(fs, path, perms, favoredNodes);
|
||||
/** Constructor that takes a path, creates and closes the output stream. */
|
||||
public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
|
||||
FileSystem fs, Path path, FSDataOutputStream ostream,
|
||||
final KVComparator comparator, final HFileContext context) throws IOException {
|
||||
super(cacheConf,
|
||||
ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
|
||||
path, comparator, context);
|
||||
finishInit(conf);
|
||||
}
|
||||
|
||||
/** Additional initialization steps */
|
||||
protected void finishInit(final Configuration conf) {
|
||||
if (fsBlockWriter != null) {
|
||||
if (fsBlockWriter != null)
|
||||
throw new IllegalStateException("finishInit called twice");
|
||||
}
|
||||
|
||||
fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
|
||||
|
||||
|
@ -300,7 +140,9 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
* @throws IOException
|
||||
*/
|
||||
protected void checkBlockBoundary() throws IOException {
|
||||
if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize()) return;
|
||||
if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
|
||||
return;
|
||||
|
||||
finishBlock();
|
||||
writeInlineBlocks(false);
|
||||
newBlock();
|
||||
|
@ -308,7 +150,8 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
|
||||
/** Clean up the current data block */
|
||||
private void finishBlock() throws IOException {
|
||||
if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return;
|
||||
if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
|
||||
return;
|
||||
|
||||
// Update the first data block offset for scanning.
|
||||
if (firstDataBlockOffset == -1) {
|
||||
|
@ -318,6 +161,7 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
lastDataBlockOffset = outputStream.getPos();
|
||||
fsBlockWriter.writeHeaderAndData(outputStream);
|
||||
int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
|
||||
|
||||
Cell indexEntry =
|
||||
CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
|
||||
dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
|
||||
|
@ -355,7 +199,8 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
*/
|
||||
private void doCacheOnWrite(long offset) {
|
||||
HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
|
||||
cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(name, offset), cacheFormatBlock);
|
||||
cacheConf.getBlockCache().cacheBlock(
|
||||
new BlockCacheKey(name, offset), cacheFormatBlock);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -399,6 +244,48 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
metaData.add(i, content);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add key/value to file. Keys must be added in an order that agrees with the
|
||||
* Comparator passed on construction.
|
||||
*
|
||||
* @param cell Cell to add. Cannot be empty nor null.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void append(final Cell cell) throws IOException {
|
||||
byte[] value = cell.getValueArray();
|
||||
int voffset = cell.getValueOffset();
|
||||
int vlength = cell.getValueLength();
|
||||
// checkKey uses comparator to check we are writing in order.
|
||||
boolean dupKey = checkKey(cell);
|
||||
checkValue(value, voffset, vlength);
|
||||
if (!dupKey) {
|
||||
checkBlockBoundary();
|
||||
}
|
||||
|
||||
if (!fsBlockWriter.isWriting()) {
|
||||
newBlock();
|
||||
}
|
||||
|
||||
fsBlockWriter.write(cell);
|
||||
|
||||
totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
|
||||
totalValueLength += vlength;
|
||||
|
||||
// Are we the first key in this block?
|
||||
if (firstCellInBlock == null) {
|
||||
// If cell is big, block will be closed and this firstCellInBlock reference will only last
|
||||
// a short while.
|
||||
firstCellInBlock = cell;
|
||||
}
|
||||
|
||||
// TODO: What if cell is 10MB and we write infrequently? We'll hold on to the cell here
|
||||
// indefinetly?
|
||||
lastCell = cell;
|
||||
entryCount++;
|
||||
this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (outputStream == null) {
|
||||
|
@ -522,120 +409,16 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
});
|
||||
}
|
||||
|
||||
protected int getMajorVersion() {
|
||||
return 2;
|
||||
}
|
||||
|
||||
protected int getMinorVersion() {
|
||||
return HFileReaderV2.MAX_MINOR_VERSION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileContext getFileContext() {
|
||||
return hFileContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add key/value to file. Keys must be added in an order that agrees with the
|
||||
* Comparator passed on construction.
|
||||
*
|
||||
* @param cell
|
||||
* Cell to add. Cannot be empty nor null.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void append(final Cell cell) throws IOException {
|
||||
byte[] value = cell.getValueArray();
|
||||
int voffset = cell.getValueOffset();
|
||||
int vlength = cell.getValueLength();
|
||||
// checkKey uses comparator to check we are writing in order.
|
||||
boolean dupKey = checkKey(cell);
|
||||
checkValue(value, voffset, vlength);
|
||||
if (!dupKey) {
|
||||
checkBlockBoundary();
|
||||
}
|
||||
|
||||
if (!fsBlockWriter.isWriting()) {
|
||||
newBlock();
|
||||
}
|
||||
|
||||
fsBlockWriter.write(cell);
|
||||
|
||||
totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
|
||||
totalValueLength += vlength;
|
||||
|
||||
// Are we the first key in this block?
|
||||
if (firstCellInBlock == null) {
|
||||
// If cell is big, block will be closed and this firstCellInBlock reference will only last
|
||||
// a short while.
|
||||
firstCellInBlock = cell;
|
||||
}
|
||||
|
||||
// TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinetly?
|
||||
lastCell = cell;
|
||||
entryCount++;
|
||||
this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
|
||||
int tagsLength = cell.getTagsLength();
|
||||
if (tagsLength > this.maxTagsLength) {
|
||||
this.maxTagsLength = tagsLength;
|
||||
}
|
||||
}
|
||||
|
||||
protected void finishFileInfo() throws IOException {
|
||||
if (lastCell != null) {
|
||||
// Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
|
||||
// byte buffer. Won't take a tuple.
|
||||
byte [] lastKey = CellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
|
||||
fileInfo.append(FileInfo.LASTKEY, lastKey, false);
|
||||
}
|
||||
|
||||
// Average key length.
|
||||
int avgKeyLen =
|
||||
entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
|
||||
fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
|
||||
fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
|
||||
false);
|
||||
|
||||
// Average value length.
|
||||
int avgValueLen =
|
||||
entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
|
||||
fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
|
||||
if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) {
|
||||
// In case of Prefix Tree encoding, we always write tags information into HFiles even if all
|
||||
// KVs are having no tags.
|
||||
fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
|
||||
} else if (hFileContext.isIncludesTags()) {
|
||||
// When tags are not being written in this file, MAX_TAGS_LEN is excluded
|
||||
// from the FileInfo
|
||||
fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
|
||||
boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
|
||||
&& hFileContext.isCompressTags();
|
||||
fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
|
||||
}
|
||||
}
|
||||
|
||||
protected int getMajorVersion() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
protected int getMinorVersion() {
|
||||
return HFileReaderImpl.MAX_MINOR_VERSION;
|
||||
}
|
||||
|
||||
protected void finishClose(FixedFileTrailer trailer) throws IOException {
|
||||
// Write out encryption metadata before finalizing if we have a valid crypto context
|
||||
Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
|
||||
if (cryptoContext != Encryption.Context.NONE) {
|
||||
// Wrap the context's key and write it as the encryption metadata, the wrapper includes
|
||||
// all information needed for decryption
|
||||
trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(),
|
||||
cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
|
||||
User.getCurrent().getShortName()),
|
||||
cryptoContext.getKey()));
|
||||
}
|
||||
// Now we can finish the close
|
||||
trailer.setMetaIndexCount(metaNames.size());
|
||||
trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
|
||||
trailer.setEntryCount(entryCount);
|
||||
trailer.setCompressionCodec(hFileContext.getCompression());
|
||||
|
||||
trailer.serialize(outputStream);
|
||||
|
||||
if (closeOutputStream) {
|
||||
outputStream.close();
|
||||
outputStream = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,136 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
|
||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* {@link HFile} writer for version 3.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HFileWriterV3 extends HFileWriterV2 {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HFileWriterV3.class);
|
||||
|
||||
private int maxTagsLength = 0;
|
||||
|
||||
static class WriterFactoryV3 extends HFile.WriterFactory {
|
||||
WriterFactoryV3(Configuration conf, CacheConfig cacheConf) {
|
||||
super(conf, cacheConf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream,
|
||||
final KVComparator comparator, HFileContext fileContext)
|
||||
throws IOException {
|
||||
return new HFileWriterV3(conf, cacheConf, fs, path, ostream, comparator, fileContext);
|
||||
}
|
||||
}
|
||||
|
||||
/** Constructor that takes a path, creates and closes the output stream. */
|
||||
public HFileWriterV3(Configuration conf, CacheConfig cacheConf, FileSystem fs, Path path,
|
||||
FSDataOutputStream ostream, final KVComparator comparator,
|
||||
final HFileContext fileContext) throws IOException {
|
||||
super(conf, cacheConf, fs, path, ostream, comparator, fileContext);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Writer" + (path != null ? " for " + path : "") +
|
||||
" initialized with cacheConf: " + cacheConf +
|
||||
" comparator: " + comparator.getClass().getSimpleName() +
|
||||
" fileContext: " + fileContext);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add key/value to file. Keys must be added in an order that agrees with the
|
||||
* Comparator passed on construction.
|
||||
*
|
||||
* @param cell
|
||||
* Cell to add. Cannot be empty nor null.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void append(final Cell cell) throws IOException {
|
||||
// Currently get the complete arrays
|
||||
super.append(cell);
|
||||
int tagsLength = cell.getTagsLength();
|
||||
if (tagsLength > this.maxTagsLength) {
|
||||
this.maxTagsLength = tagsLength;
|
||||
}
|
||||
}
|
||||
|
||||
protected void finishFileInfo() throws IOException {
|
||||
super.finishFileInfo();
|
||||
if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) {
|
||||
// In case of Prefix Tree encoding, we always write tags information into HFiles even if all
|
||||
// KVs are having no tags.
|
||||
fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
|
||||
} else if (hFileContext.isIncludesTags()) {
|
||||
// When tags are not being written in this file, MAX_TAGS_LEN is excluded
|
||||
// from the FileInfo
|
||||
fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
|
||||
boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
|
||||
&& hFileContext.isCompressTags();
|
||||
fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getMajorVersion() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getMinorVersion() {
|
||||
return HFileReaderV3.MAX_MINOR_VERSION;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finishClose(FixedFileTrailer trailer) throws IOException {
|
||||
// Write out encryption metadata before finalizing if we have a valid crypto context
|
||||
Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
|
||||
if (cryptoContext != Encryption.Context.NONE) {
|
||||
// Wrap the context's key and write it as the encryption metadata, the wrapper includes
|
||||
// all information needed for decryption
|
||||
trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(),
|
||||
cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
|
||||
User.getCurrent().getShortName()),
|
||||
cryptoContext.getKey()));
|
||||
}
|
||||
// Now we can finish the close
|
||||
super.finishClose(trailer);
|
||||
}
|
||||
|
||||
}
|
|
@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
|||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
|
||||
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
|
@ -129,7 +129,7 @@ public class HFileOutputFormat2
|
|||
// Invented config. Add to hbase-*.xml if other than default compression.
|
||||
final String defaultCompressionStr = conf.get("hfile.compression",
|
||||
Compression.Algorithm.NONE.getName());
|
||||
final Algorithm defaultCompression = HFileWriterImpl
|
||||
final Algorithm defaultCompression = AbstractHFileWriter
|
||||
.compressionByName(defaultCompressionStr);
|
||||
final boolean compactionExclude = conf.getBoolean(
|
||||
"hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
|
||||
|
@ -245,7 +245,7 @@ public class HFileOutputFormat2
|
|||
.withBlockSize(blockSize);
|
||||
contextBuilder.withDataBlockEncoding(encoding);
|
||||
HFileContext hFileContext = contextBuilder.build();
|
||||
|
||||
|
||||
wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
|
||||
.withOutputDir(familydir).withBloomType(bloomType)
|
||||
.withComparator(KeyValue.COMPARATOR)
|
||||
|
@ -358,7 +358,7 @@ public class HFileOutputFormat2
|
|||
* </ul>
|
||||
* The user should be sure to set the map output value class to either KeyValue or Put before
|
||||
* running this function.
|
||||
*
|
||||
*
|
||||
* @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
|
@ -448,7 +448,7 @@ public class HFileOutputFormat2
|
|||
TableMapReduceUtil.initCredentials(job);
|
||||
LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
|
||||
}
|
||||
|
||||
|
||||
public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
|
||||
|
@ -483,7 +483,8 @@ public class HFileOutputFormat2
|
|||
Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
|
||||
Algorithm>(Bytes.BYTES_COMPARATOR);
|
||||
for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
|
||||
Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
|
||||
Algorithm algorithm = AbstractHFileWriter.compressionByName
|
||||
(e.getValue());
|
||||
compressionMap.put(e.getKey(), algorithm);
|
||||
}
|
||||
return compressionMap;
|
||||
|
|
|
@ -95,7 +95,6 @@ import org.apache.hadoop.hbase.executor.ExecutorType;
|
|||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.http.InfoServer;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
|
@ -491,7 +490,6 @@ public class HRegionServer extends HasThread implements
|
|||
throws IOException, InterruptedException {
|
||||
this.fsOk = true;
|
||||
this.conf = conf;
|
||||
HFile.checkHFileVersion(this.conf);
|
||||
checkCodecs(this.conf);
|
||||
this.userProvider = UserProvider.instantiate(conf);
|
||||
FSUtils.setupShortCircuitRead(this.conf);
|
||||
|
|
|
@ -136,7 +136,7 @@ public interface Region extends ConfigurationObserver {
|
|||
*/
|
||||
long getOldestHfileTs(boolean majorCompactioOnly) throws IOException;
|
||||
|
||||
/**
|
||||
/**
|
||||
* @return map of column family names to max sequence id that was read from storage when this
|
||||
* region was opened
|
||||
*/
|
||||
|
@ -157,7 +157,7 @@ public interface Region extends ConfigurationObserver {
|
|||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Metrics
|
||||
|
||||
|
||||
/** @return read requests count for this region */
|
||||
long getReadRequestsCount();
|
||||
|
||||
|
@ -181,7 +181,7 @@ public interface Region extends ConfigurationObserver {
|
|||
|
||||
/** @return the number of mutations processed bypassing the WAL */
|
||||
long getNumMutationsWithoutWAL();
|
||||
|
||||
|
||||
/** @return the size of data processed bypassing the WAL, in bytes */
|
||||
long getDataInMemoryWithoutWAL();
|
||||
|
||||
|
@ -216,7 +216,7 @@ public interface Region extends ConfigurationObserver {
|
|||
|
||||
/**
|
||||
* This method needs to be called before any public call that reads or
|
||||
* modifies data.
|
||||
* modifies data.
|
||||
* Acquires a read lock and checks if the region is closing or closed.
|
||||
* <p>{@link #closeRegionOperation} MUST then always be called after
|
||||
* the operation has completed, whether it succeeded or failed.
|
||||
|
@ -226,7 +226,7 @@ public interface Region extends ConfigurationObserver {
|
|||
|
||||
/**
|
||||
* This method needs to be called before any public call that reads or
|
||||
* modifies data.
|
||||
* modifies data.
|
||||
* Acquires a read lock and checks if the region is closing or closed.
|
||||
* <p>{@link #closeRegionOperation} MUST then always be called after
|
||||
* the operation has completed, whether it succeeded or failed.
|
||||
|
@ -413,7 +413,7 @@ public interface Region extends ConfigurationObserver {
|
|||
|
||||
/**
|
||||
* Perform atomic mutations within the region.
|
||||
*
|
||||
*
|
||||
* @param mutations The list of mutations to perform.
|
||||
* <code>mutations</code> can contain operations for multiple rows.
|
||||
* Caller has to ensure that all rows are contained in this region.
|
||||
|
@ -588,7 +588,7 @@ public interface Region extends ConfigurationObserver {
|
|||
byte[] now) throws IOException;
|
||||
|
||||
/**
|
||||
* Replace any cell timestamps set to {@link org.apache.hadoop.hbase.HConstants#LATEST_TIMESTAMP}
|
||||
* Replace any cell timestamps set to HConstants#LATEST_TIMESTAMP with the
|
||||
* provided current timestamp.
|
||||
* @param values
|
||||
* @param now
|
||||
|
@ -609,13 +609,13 @@ public interface Region extends ConfigurationObserver {
|
|||
CANNOT_FLUSH_MEMSTORE_EMPTY,
|
||||
CANNOT_FLUSH
|
||||
}
|
||||
|
||||
|
||||
/** @return the detailed result code */
|
||||
Result getResult();
|
||||
|
||||
/** @return true if the memstores were flushed, else false */
|
||||
boolean isFlushSucceeded();
|
||||
|
||||
|
||||
/** @return True if the flush requested a compaction, else false */
|
||||
boolean isCompactionNeeded();
|
||||
}
|
||||
|
@ -647,7 +647,7 @@ public interface Region extends ConfigurationObserver {
|
|||
* Synchronously compact all stores in the region.
|
||||
* <p>This operation could block for a long time, so don't call it from a
|
||||
* time-sensitive thread.
|
||||
* <p>Note that no locks are taken to prevent possible conflicts between
|
||||
* <p>Note that no locks are taken to prevent possible conflicts between
|
||||
* compaction and splitting activities. The regionserver does not normally compact
|
||||
* and split in parallel. However by calling this method you may introduce
|
||||
* unexpected and unhandled concurrency. Don't do this unless you know what
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||
import org.apache.hadoop.hbase.util.BloomFilter;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||
|
@ -408,7 +409,7 @@ public class StoreFile {
|
|||
}
|
||||
this.reader.setSequenceID(this.sequenceid);
|
||||
|
||||
b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
|
||||
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
|
||||
if (b != null) {
|
||||
this.maxMemstoreTS = Bytes.toLong(b);
|
||||
}
|
||||
|
|
|
@ -34,8 +34,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
|
||||
|
@ -142,7 +142,7 @@ public abstract class Compactor {
|
|||
fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
|
||||
}
|
||||
else {
|
||||
tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
|
||||
tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
|
||||
if (tmp != null) {
|
||||
fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
|
||||
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
|
@ -120,7 +120,7 @@ public class CompressionTest {
|
|||
throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
HFileContext context = new HFileContextBuilder()
|
||||
.withCompression(HFileWriterImpl.compressionByName(codec)).build();
|
||||
.withCompression(AbstractHFileWriter.compressionByName(codec)).build();
|
||||
HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
|
||||
.withPath(fs, path)
|
||||
.withFileContext(context)
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
|||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||
import org.apache.hadoop.hbase.io.crypto.aes.AES;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
|
||||
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
|
@ -326,7 +326,7 @@ public class HFilePerformanceEvaluation {
|
|||
void setUp() throws Exception {
|
||||
|
||||
HFileContextBuilder builder = new HFileContextBuilder()
|
||||
.withCompression(HFileWriterImpl.compressionByName(codec))
|
||||
.withCompression(AbstractHFileWriter.compressionByName(codec))
|
||||
.withBlockSize(RFILE_BLOCKSIZE);
|
||||
|
||||
if (cipher == "aes") {
|
||||
|
|
|
@ -241,6 +241,7 @@ public class TestCacheOnWrite {
|
|||
public void setUp() throws IOException {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
this.conf.set("dfs.datanode.data.dir.perm", "700");
|
||||
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
|
||||
conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
|
||||
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
|
||||
BLOOM_BLOCK_SIZE);
|
||||
|
@ -270,7 +271,12 @@ public class TestCacheOnWrite {
|
|||
}
|
||||
|
||||
private void readStoreFile(boolean useTags) throws IOException {
|
||||
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, conf);
|
||||
AbstractHFileReader reader;
|
||||
if (useTags) {
|
||||
reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf);
|
||||
} else {
|
||||
reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf);
|
||||
}
|
||||
LOG.info("HFile information: " + reader);
|
||||
HFileContext meta = new HFileContextBuilder().withCompression(compress)
|
||||
.withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
|
||||
|
@ -371,6 +377,11 @@ public class TestCacheOnWrite {
|
|||
}
|
||||
|
||||
private void writeStoreFile(boolean useTags) throws IOException {
|
||||
if(useTags) {
|
||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
|
||||
} else {
|
||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
|
||||
}
|
||||
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
|
||||
"test_cache_on_write");
|
||||
HFileContext meta = new HFileContextBuilder().withCompression(compress)
|
||||
|
@ -410,6 +421,11 @@ public class TestCacheOnWrite {
|
|||
|
||||
private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
|
||||
throws IOException, InterruptedException {
|
||||
if (useTags) {
|
||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
|
||||
} else {
|
||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
|
||||
}
|
||||
// TODO: need to change this test if we add a cache size threshold for
|
||||
// compactions, or if we implement some other kind of intelligent logic for
|
||||
// deciding what blocks to cache-on-write on compaction.
|
||||
|
|
|
@ -89,7 +89,7 @@ public class TestFixedFileTrailer {
|
|||
@Test
|
||||
public void testTrailer() throws IOException {
|
||||
FixedFileTrailer t = new FixedFileTrailer(version,
|
||||
HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION);
|
||||
HFileReaderV2.PBUF_TRAILER_MINOR_VERSION);
|
||||
t.setDataIndexCount(3);
|
||||
t.setEntryCount(((long) Integer.MAX_VALUE) + 1);
|
||||
|
||||
|
@ -122,7 +122,7 @@ public class TestFixedFileTrailer {
|
|||
{
|
||||
DataInputStream dis = new DataInputStream(bais);
|
||||
FixedFileTrailer t2 = new FixedFileTrailer(version,
|
||||
HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION);
|
||||
HFileReaderV2.PBUF_TRAILER_MINOR_VERSION);
|
||||
t2.deserialize(dis);
|
||||
assertEquals(-1, bais.read()); // Ensure we have read everything.
|
||||
checkLoadedTrailer(version, t, t2);
|
||||
|
@ -171,7 +171,7 @@ public class TestFixedFileTrailer {
|
|||
public void testTrailerForV2NonPBCompatibility() throws Exception {
|
||||
if (version == 2) {
|
||||
FixedFileTrailer t = new FixedFileTrailer(version,
|
||||
HFileReaderImpl.MINOR_VERSION_NO_CHECKSUM);
|
||||
HFileReaderV2.MINOR_VERSION_NO_CHECKSUM);
|
||||
t.setDataIndexCount(3);
|
||||
t.setEntryCount(((long) Integer.MAX_VALUE) + 1);
|
||||
t.setLastDataBlockOffset(291);
|
||||
|
@ -198,7 +198,7 @@ public class TestFixedFileTrailer {
|
|||
{
|
||||
DataInputStream dis = new DataInputStream(bais);
|
||||
FixedFileTrailer t2 = new FixedFileTrailer(version,
|
||||
HFileReaderImpl.MINOR_VERSION_NO_CHECKSUM);
|
||||
HFileReaderV2.MINOR_VERSION_NO_CHECKSUM);
|
||||
t2.deserialize(dis);
|
||||
assertEquals(-1, bais.read()); // Ensure we have read everything.
|
||||
checkLoadedTrailer(version, t, t2);
|
||||
|
|
|
@ -81,6 +81,8 @@ public class TestForceCacheImportantBlocks {
|
|||
public static Collection<Object[]> parameters() {
|
||||
// HFile versions
|
||||
return Arrays.asList(
|
||||
new Object[] { 2, true },
|
||||
new Object[] { 2, false },
|
||||
new Object[] { 3, true },
|
||||
new Object[] { 3, false }
|
||||
);
|
||||
|
|
|
@ -239,7 +239,7 @@ public class TestHFile extends HBaseTestCase {
|
|||
FSDataOutputStream fout = createFSOutput(ncTFile);
|
||||
HFileContext meta = new HFileContextBuilder()
|
||||
.withBlockSize(minBlockSize)
|
||||
.withCompression(HFileWriterImpl.compressionByName(codec))
|
||||
.withCompression(AbstractHFileWriter.compressionByName(codec))
|
||||
.build();
|
||||
Writer writer = HFile.getWriterFactory(conf, cacheConf)
|
||||
.withOutputStream(fout)
|
||||
|
@ -330,7 +330,7 @@ public class TestHFile extends HBaseTestCase {
|
|||
Path mFile = new Path(ROOT_DIR, "meta.hfile");
|
||||
FSDataOutputStream fout = createFSOutput(mFile);
|
||||
HFileContext meta = new HFileContextBuilder()
|
||||
.withCompression(HFileWriterImpl.compressionByName(compress))
|
||||
.withCompression(AbstractHFileWriter.compressionByName(compress))
|
||||
.withBlockSize(minBlockSize).build();
|
||||
Writer writer = HFile.getWriterFactory(conf, cacheConf)
|
||||
.withOutputStream(fout)
|
||||
|
|
|
@ -589,7 +589,7 @@ public class TestHFileBlockIndex {
|
|||
}
|
||||
|
||||
// Manually compute the mid-key and validate it.
|
||||
HFile.Reader reader2 = reader;
|
||||
HFileReaderV2 reader2 = (HFileReaderV2) reader;
|
||||
HFileBlock.FSReader fsReader = reader2.getUncachedBlockReader();
|
||||
|
||||
HFileBlock.BlockIterator iter = fsReader.blockRange(0,
|
||||
|
|
|
@ -54,7 +54,8 @@ public class TestHFileInlineToRootChunkConversion {
|
|||
CacheConfig cacheConf = new CacheConfig(conf);
|
||||
conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, maxChunkSize);
|
||||
HFileContext context = new HFileContextBuilder().withBlockSize(16).build();
|
||||
HFile.Writer hfw = new HFileWriterFactory(conf, cacheConf)
|
||||
HFileWriterV2 hfw =
|
||||
(HFileWriterV2) new HFileWriterV2.WriterFactoryV2(conf, cacheConf)
|
||||
.withFileContext(context)
|
||||
.withPath(fs, hfPath).create();
|
||||
List<byte[]> keys = new ArrayList<byte[]>();
|
||||
|
@ -76,7 +77,7 @@ public class TestHFileInlineToRootChunkConversion {
|
|||
}
|
||||
hfw.close();
|
||||
|
||||
HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, conf);
|
||||
HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs, hfPath, cacheConf, conf);
|
||||
// Scanner doesn't do Cells yet. Fix.
|
||||
HFileScanner scanner = reader.getScanner(true, true);
|
||||
for (int i = 0; i < keys.size(); ++i) {
|
||||
|
@ -84,4 +85,4 @@ public class TestHFileInlineToRootChunkConversion {
|
|||
}
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -129,7 +129,7 @@ public class TestHFileSeek extends TestCase {
|
|||
try {
|
||||
HFileContext context = new HFileContextBuilder()
|
||||
.withBlockSize(options.minBlockSize)
|
||||
.withCompression(HFileWriterImpl.compressionByName(options.compress))
|
||||
.withCompression(AbstractHFileWriter.compressionByName(options.compress))
|
||||
.build();
|
||||
Writer writer = HFile.getWriterFactoryNoCache(conf)
|
||||
.withOutputStream(fout)
|
||||
|
|
|
@ -55,7 +55,7 @@ import org.junit.experimental.categories.Category;
|
|||
|
||||
/**
|
||||
* Testing writing a version 2 {@link HFile}. This is a low-level test written
|
||||
* during the development of {@link HFileWriterImpl}.
|
||||
* during the development of {@link HFileWriterV2}.
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestHFileWriterV2 {
|
||||
|
@ -98,7 +98,8 @@ public class TestHFileWriterV2 {
|
|||
.withBlockSize(4096)
|
||||
.withCompression(compressAlgo)
|
||||
.build();
|
||||
HFile.Writer writer = new HFileWriterFactory(conf, new CacheConfig(conf))
|
||||
HFileWriterV2 writer = (HFileWriterV2)
|
||||
new HFileWriterV2.WriterFactoryV2(conf, new CacheConfig(conf))
|
||||
.withPath(fs, hfilePath)
|
||||
.withFileContext(context)
|
||||
.create();
|
||||
|
@ -134,6 +135,7 @@ public class TestHFileWriterV2 {
|
|||
FixedFileTrailer trailer =
|
||||
FixedFileTrailer.readFromStream(fsdis, fileSize);
|
||||
|
||||
assertEquals(2, trailer.getMajorVersion());
|
||||
assertEquals(entryCount, trailer.getEntryCount());
|
||||
|
||||
HFileContext meta = new HFileContextBuilder()
|
||||
|
@ -174,7 +176,8 @@ public class TestHFileWriterV2 {
|
|||
// File info
|
||||
FileInfo fileInfo = new FileInfo();
|
||||
fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
|
||||
byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
|
||||
byte [] keyValueFormatVersion = fileInfo.get(
|
||||
HFileWriterV2.KEY_VALUE_VERSION);
|
||||
boolean includeMemstoreTS = keyValueFormatVersion != null &&
|
||||
Bytes.toInt(keyValueFormatVersion) > 0;
|
||||
|
||||
|
|
|
@ -59,7 +59,8 @@ import org.junit.runners.Parameterized;
|
|||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
/**
|
||||
* Testing writing a version 3 {@link HFile}.
|
||||
* Testing writing a version 3 {@link HFile}. This is a low-level test written
|
||||
* during the development of {@link HFileWriterV3}.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
@Category(SmallTests.class)
|
||||
|
@ -118,7 +119,8 @@ public class TestHFileWriterV3 {
|
|||
.withBlockSize(4096)
|
||||
.withIncludesTags(useTags)
|
||||
.withCompression(compressAlgo).build();
|
||||
HFile.Writer writer = new HFileWriterFactory(conf, new CacheConfig(conf))
|
||||
HFileWriterV3 writer = (HFileWriterV3)
|
||||
new HFileWriterV3.WriterFactoryV3(conf, new CacheConfig(conf))
|
||||
.withPath(fs, hfilePath)
|
||||
.withFileContext(context)
|
||||
.withComparator(KeyValue.COMPARATOR)
|
||||
|
@ -203,7 +205,8 @@ public class TestHFileWriterV3 {
|
|||
// File info
|
||||
FileInfo fileInfo = new FileInfo();
|
||||
fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
|
||||
byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
|
||||
byte [] keyValueFormatVersion = fileInfo.get(
|
||||
HFileWriterV3.KEY_VALUE_VERSION);
|
||||
boolean includeMemstoreTS = keyValueFormatVersion != null &&
|
||||
Bytes.toInt(keyValueFormatVersion) > 0;
|
||||
|
||||
|
|
|
@ -88,7 +88,8 @@ public class TestLazyDataBlockDecompression {
|
|||
*/
|
||||
private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path,
|
||||
HFileContext cxt, int entryCount) throws IOException {
|
||||
HFile.Writer writer = new HFileWriterFactory(conf, cc)
|
||||
HFileWriterV2 writer = (HFileWriterV2)
|
||||
new HFileWriterV2.WriterFactoryV2(conf, cc)
|
||||
.withPath(fs, path)
|
||||
.withFileContext(cxt)
|
||||
.create();
|
||||
|
@ -116,7 +117,7 @@ public class TestLazyDataBlockDecompression {
|
|||
long fileSize = fs.getFileStatus(path).getLen();
|
||||
FixedFileTrailer trailer =
|
||||
FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
|
||||
HFile.Reader reader = new HFileReaderImpl(path, trailer, fsdis, fileSize, cacheConfig,
|
||||
HFileReaderV2 reader = new HFileReaderV2(path, trailer, fsdis, fileSize, cacheConfig,
|
||||
fsdis.getHfs(), conf);
|
||||
reader.loadFileInfo();
|
||||
long offset = trailer.getFirstDataBlockOffset(),
|
||||
|
|
|
@ -54,6 +54,7 @@ public class TestPrefetch {
|
|||
@Before
|
||||
public void setUp() throws IOException {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
|
||||
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
|
||||
fs = HFileSystem.get(conf);
|
||||
CacheConfig.blockCacheDisabled = false;
|
||||
|
@ -68,9 +69,10 @@ public class TestPrefetch {
|
|||
|
||||
private void readStoreFile(Path storeFilePath) throws Exception {
|
||||
// Open the file
|
||||
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, conf);
|
||||
HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs,
|
||||
storeFilePath, cacheConf, conf);
|
||||
|
||||
while (!reader.prefetchComplete()) {
|
||||
while (!((HFileReaderV3)reader).prefetchComplete()) {
|
||||
// Sleep for a bit
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test {@link HFileScanner#reseekTo(org.apache.hadoop.hbase.Cell)}
|
||||
* Test {@link HFileScanner#reseekTo(byte[])}
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestReseekTo {
|
||||
|
|
|
@ -70,7 +70,12 @@ public class TestSeekTo extends HBaseTestCase {
|
|||
}
|
||||
|
||||
Path makeNewFile(TagUsage tagUsage) throws IOException {
|
||||
Path ncTFile = new Path(testDir, "basic.hfile");
|
||||
Path ncTFile = new Path(this.testDir, "basic.hfile");
|
||||
if (tagUsage != TagUsage.NO_TAG) {
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
} else {
|
||||
conf.setInt("hfile.format.version", 2);
|
||||
}
|
||||
FSDataOutputStream fout = this.fs.create(ncTFile);
|
||||
int blocksize = toKV("a", tagUsage).getLength() * 3;
|
||||
HFileContext context = new HFileContextBuilder().withBlockSize(blocksize)
|
||||
|
@ -133,7 +138,7 @@ public class TestSeekTo extends HBaseTestCase {
|
|||
}
|
||||
|
||||
public void testSeekBeforeWithReSeekTo() throws Exception {
|
||||
testSeekBeforeInternals(TagUsage.NO_TAG);
|
||||
testSeekBeforeWithReSeekToInternals(TagUsage.NO_TAG);
|
||||
testSeekBeforeWithReSeekToInternals(TagUsage.ONLY_TAG);
|
||||
testSeekBeforeWithReSeekToInternals(TagUsage.PARTIAL_TAG);
|
||||
}
|
||||
|
@ -222,7 +227,7 @@ public class TestSeekTo extends HBaseTestCase {
|
|||
}
|
||||
|
||||
public void testSeekTo() throws Exception {
|
||||
testSeekBeforeInternals(TagUsage.NO_TAG);
|
||||
testSeekToInternals(TagUsage.NO_TAG);
|
||||
testSeekToInternals(TagUsage.ONLY_TAG);
|
||||
testSeekToInternals(TagUsage.PARTIAL_TAG);
|
||||
}
|
||||
|
@ -250,7 +255,7 @@ public class TestSeekTo extends HBaseTestCase {
|
|||
reader.close();
|
||||
}
|
||||
public void testBlockContainingKey() throws Exception {
|
||||
testSeekBeforeInternals(TagUsage.NO_TAG);
|
||||
testBlockContainingKeyInternals(TagUsage.NO_TAG);
|
||||
testBlockContainingKeyInternals(TagUsage.ONLY_TAG);
|
||||
testBlockContainingKeyInternals(TagUsage.PARTIAL_TAG);
|
||||
}
|
||||
|
@ -259,7 +264,7 @@ public class TestSeekTo extends HBaseTestCase {
|
|||
Path p = makeNewFile(tagUsage);
|
||||
HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf);
|
||||
reader.loadFileInfo();
|
||||
HFileBlockIndex.BlockIndexReader blockIndexReader =
|
||||
HFileBlockIndex.BlockIndexReader blockIndexReader =
|
||||
reader.getDataBlockIndexReader();
|
||||
System.out.println(blockIndexReader.toString());
|
||||
// falls before the start of the file.
|
||||
|
|
|
@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
|
@ -602,9 +602,8 @@ public class DataBlockEncodingTool {
|
|||
// run the utilities
|
||||
DataBlockEncodingTool comp = new DataBlockEncodingTool(compressionName);
|
||||
int majorVersion = reader.getHFileVersion();
|
||||
comp.useHBaseChecksum = majorVersion > 2 ||
|
||||
(majorVersion == 2 &&
|
||||
reader.getHFileMinorVersion() >= HFileReaderImpl.MINOR_VERSION_WITH_CHECKSUM);
|
||||
comp.useHBaseChecksum = majorVersion > 2
|
||||
|| (majorVersion == 2 && reader.getHFileMinorVersion() >= HFileReaderV2.MINOR_VERSION_WITH_CHECKSUM);
|
||||
comp.checkStatistics(scanner, kvLimit);
|
||||
if (doVerify) {
|
||||
comp.verifyCodecs(scanner, kvLimit);
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockType;
|
|||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
|
@ -219,7 +220,7 @@ public class TestCacheOnWriteInSchema {
|
|||
BlockCache cache = cacheConf.getBlockCache();
|
||||
StoreFile sf = new StoreFile(fs, path, conf, cacheConf,
|
||||
BloomType.ROWCOL);
|
||||
HFile.Reader reader = sf.createReader().getHFileReader();
|
||||
HFileReaderV2 reader = (HFileReaderV2) sf.createReader().getHFileReader();
|
||||
try {
|
||||
// Open a scanner with (on read) caching disabled
|
||||
HFileScanner scanner = reader.getScanner(false, false);
|
||||
|
|
Loading…
Reference in New Issue