HBASE-7970 Improve file descriptor usage: currently, there are two file descriptors per storefile

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1476677 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2013-04-27 21:34:43 +00:00
parent e2868556df
commit 4e28bd8bab
10 changed files with 319 additions and 245 deletions

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FileLink;
import com.google.common.annotations.VisibleForTesting;
/**
* Wrapper for input stream(s) that takes care of the interaction of FS and HBase checksums,
* as well as closing streams. Initialization is not thread-safe, but normal operation is;
* see method comments.
*/
public class FSDataInputStreamWrapper {
private final HFileSystem hfs;
private final Path path;
private final FileLink link;
private final boolean doCloseStreams;
/** Two stream handles, one with and one without FS-level checksum.
* HDFS checksum setting is on FS level, not single read level, so you have to keep two
* FS objects and two handles open to interleave different reads freely, which is very sad.
* This is what we do:
* 1) First, we need to read the trailer of HFile to determine checksum parameters.
* We always use FS checksum to do that, so ctor opens {@link #stream}.
* 2.1) After that, if HBase checksum is not used, we'd just always use {@link #stream};
* 2.2) If HBase checksum can be used, we'll open {@link #streamNoFsChecksum},
* and close {@link #stream}. User MUST call prepareForBlockReader for that to happen;
* if they don't, (2.1) will be the default.
* 3) The users can call {@link #shouldUseHBaseChecksum()}, and pass its result to
* {@link #getStream(boolean)} to get stream (if Java had out/pointer params we could
* return both in one call). This stream is guaranteed to be set.
* 4) The first time HBase checksum fails, one would call {@link #fallbackToFsChecksum(int)}.
* That will take lock, and open {@link #stream}. While this is going on, others will
* continue to use the old stream; if they also want to fall back, they'll also call
* {@link #fallbackToFsChecksum(int)}, and block until {@link #stream} is set.
* 5) After some number of checksumOk() calls, we will go back to using HBase checksum.
* We will have 2 handles; however we presume checksums fail so rarely that we don't care.
*/
private volatile FSDataInputStream stream = null;
private volatile FSDataInputStream streamNoFsChecksum = null;
private Object streamNoFsChecksumFirstCreateLock = new Object();
// The configuration states that we should validate hbase checksums
private boolean useHBaseChecksumConfigured;
// Record the current state of this reader with respect to
// validating checkums in HBase. This is originally set the same
// value as useHBaseChecksumConfigured, but can change state as and when
// we encounter checksum verification failures.
private volatile boolean useHBaseChecksum;
// In the case of a checksum failure, do these many succeeding
// reads without hbase checksum verification.
private volatile int hbaseChecksumOffCount = -1;
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
this(fs, null, path);
}
public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
this(fs, link, null);
}
private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException {
assert (path == null) != (link == null);
this.path = path;
this.link = link;
this.doCloseStreams = true;
// If the fs is not an instance of HFileSystem, then create an instance of HFileSystem
// that wraps over the specified fs. In this case, we will not be able to avoid
// checksumming inside the filesystem.
this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs);
// Initially we are going to read the tail block. Open the reader w/FS checksum.
this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
}
/**
* Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
* reads finish and before any other reads start (what happens in reality is we read the
* tail, then call this based on what's in the tail, then read blocks).
* @param forceNoHBaseChecksum Force not using HBase checksum.
*/
public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
if (hfs == null) return;
assert this.stream != null && !this.useHBaseChecksumConfigured;
boolean useHBaseChecksum =
!forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs);
if (useHBaseChecksum) {
FileSystem fsNc = hfs.getNoChecksumFs();
this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
// Close the checksum stream; we will reopen it if we get an HBase checksum failure.
this.stream.close();
this.stream = null;
}
}
/** For use in tests. */
@VisibleForTesting
public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
this(fsdis, fsdis);
}
/** For use in tests. */
@VisibleForTesting
public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) {
doCloseStreams = false;
stream = fsdis;
streamNoFsChecksum = noChecksum;
path = null;
link = null;
hfs = null;
useHBaseChecksumConfigured = useHBaseChecksum = false;
}
/**
* @return Whether we are presently using HBase checksum.
*/
public boolean shouldUseHBaseChecksum() {
return this.useHBaseChecksum;
}
/**
* Get the stream to use. Thread-safe.
* @param useHBaseChecksum must be the value that shouldUseHBaseChecksum has returned
* at some point in the past, otherwise the result is undefined.
*/
public FSDataInputStream getStream(boolean useHBaseChecksum) {
return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
}
/**
* Read from non-checksum stream failed, fall back to FS checksum. Thread-safe.
* @param offCount For how many checksumOk calls to turn off the HBase checksum.
*/
public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
// checksumOffCount is speculative, but let's try to reset it less.
boolean partOfConvoy = false;
if (this.stream == null) {
synchronized (streamNoFsChecksumFirstCreateLock) {
partOfConvoy = (this.stream != null);
if (!partOfConvoy) {
this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
}
}
}
if (!partOfConvoy) {
this.useHBaseChecksum = false;
this.hbaseChecksumOffCount = offCount;
}
return this.stream;
}
/** Report that checksum was ok, so we may ponder going back to HBase checksum. */
public void checksumOk() {
if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum
&& (this.hbaseChecksumOffCount-- < 0)) {
// The stream we need is already open (because we were using HBase checksum in the past).
assert this.streamNoFsChecksum != null;
this.useHBaseChecksum = true;
}
}
/** Close stream(s) if necessary. */
public void close() throws IOException {
if (!doCloseStreams) return;
try {
if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
streamNoFsChecksum.close();
streamNoFsChecksum = null;
}
} finally {
if (stream != null) {
stream.close();
stream = null;
}
}
}
public HFileSystem getHfs() {
return this.hfs;
}
}

View File

@ -87,18 +87,17 @@ public class HalfStoreFileReader extends StoreFile.Reader {
* Creates a half file reader for a hfile referred to by an hfilelink.
* @param fs fileystem to read from
* @param p path to hfile
* @param in {@link FSDataInputStream}
* @param inNoChecksum {@link FSDataInputStream} opened on a filesystem without checksum
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the hfile file
* @param cacheConf
* @param r original reference file (contains top or bottom)
* @param preferredEncodingInCache
* @throws IOException
*/
public HalfStoreFileReader(final FileSystem fs, final Path p, final FSDataInputStream in,
final FSDataInputStream inNoChecksum, long size, final CacheConfig cacheConf,
final Reference r, final DataBlockEncoding preferredEncodingInCache) throws IOException {
super(fs, p, in, inNoChecksum, size, cacheConf, preferredEncodingInCache, true);
public HalfStoreFileReader(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in,
long size, final CacheConfig cacheConf, final Reference r,
final DataBlockEncoding preferredEncodingInCache) throws IOException {
super(fs, p, in, size, cacheConf, preferredEncodingInCache);
// This is not actual midkey for this half-file; its just border
// around which we split top and bottom. Have to look in files to find
// actual last and first keys for bottom and top halves. Half-files don't

View File

@ -36,10 +36,6 @@ import org.apache.hadoop.io.RawComparator;
*/
@InterfaceAudience.Private
public abstract class AbstractHFileReader implements HFile.Reader {
/** Filesystem-level block reader for this HFile format version. */
protected HFileBlock.FSReader fsBlockReader;
/** Stream to read from. Does checksum verifications in file system */
protected FSDataInputStream istream;
@ -47,12 +43,6 @@ public abstract class AbstractHFileReader implements HFile.Reader {
* does not do checksum verification in the file system */
protected FSDataInputStream istreamNoFsChecksum;
/**
* True if we should close the input stream when done. We don't close it if we
* didn't open it.
*/
protected final boolean closeIStream;
/** Data block index reader keeping the root data index in memory */
protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
@ -101,27 +91,14 @@ public abstract class AbstractHFileReader implements HFile.Reader {
protected HFileSystem hfs;
protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final long fileSize,
final boolean closeIStream,
final CacheConfig cacheConf) {
this(path, trailer, fsdis, fsdis, fileSize, closeIStream, cacheConf, null);
}
protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final FSDataInputStream fsdisNoFsChecksum,
final long fileSize,
final boolean closeIStream,
final CacheConfig cacheConf, final HFileSystem hfs) {
final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs) {
this.trailer = trailer;
this.compressAlgo = trailer.getCompressionCodec();
this.cacheConf = cacheConf;
this.fileSize = fileSize;
this.istream = fsdis;
this.closeIStream = closeIStream;
this.path = path;
this.name = path.getName();
this.hfs = hfs;
this.istreamNoFsChecksum = fsdisNoFsChecksum;
}
@SuppressWarnings("serial")
@ -341,9 +318,7 @@ public abstract class AbstractHFileReader implements HFile.Reader {
}
/** For testing */
HFileBlock.FSReader getUncachedBlockReader() {
return fsBlockReader;
}
abstract HFileBlock.FSReader getUncachedBlockReader();
public Path getPath() {
return path;

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.regionserver.StoreFile.WriterBuilder;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
@ -543,32 +544,29 @@ public class HFile {
* TODO This is a bad abstraction. See HBASE-6635.
*
* @param path hfile's path
* @param fsdis an open checksummed stream of path's file
* @param fsdisNoFsChecksum an open unchecksummed stream of path's file
* @param fsdis stream of path's file
* @param size max size of the trailer.
* @param closeIStream boolean for closing file after the getting the reader version.
* @param cacheConf Cache configuation values, cannot be null.
* @param preferredEncodingInCache
* @param hfs
* @return an appropriate instance of HFileReader
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
*/
private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,
FSDataInputStream fsdisNoFsChecksum,
long size, boolean closeIStream, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache, HFileSystem hfs)
throws IOException {
private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
long size, CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache,
HFileSystem hfs) throws IOException {
FixedFileTrailer trailer = null;
try {
trailer = FixedFileTrailer.readFromStream(fsdis, size);
boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
assert !isHBaseChecksum; // Initially we must read with FS checksum.
trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
} catch (IllegalArgumentException iae) {
throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, iae);
}
switch (trailer.getMajorVersion()) {
case 2:
return new HFileReaderV2(path, trailer, fsdis, fsdisNoFsChecksum,
size, closeIStream,
cacheConf, preferredEncodingInCache, hfs);
return new HFileReaderV2(
path, trailer, fsdis, size, cacheConf, preferredEncodingInCache, hfs);
default:
throw new CorruptHFileException("Invalid HFile version " + trailer.getMajorVersion());
}
@ -586,43 +584,24 @@ public class HFile {
FileSystem fs, Path path, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache) throws IOException {
final boolean closeIStream = true;
HFileSystem hfs = null;
FSDataInputStream fsdis = fs.open(path);
FSDataInputStream fsdisNoFsChecksum = fsdis;
// If the fs is not an instance of HFileSystem, then create an
// instance of HFileSystem that wraps over the specified fs.
// In this case, we will not be able to avoid checksumming inside
// the filesystem.
if (!(fs instanceof HFileSystem)) {
hfs = new HFileSystem(fs);
} else {
hfs = (HFileSystem)fs;
// open a stream to read data without checksum verification in
// the filesystem
fsdisNoFsChecksum = hfs.getNoChecksumFs().open(path);
}
return pickReaderVersion(path, fsdis, fsdisNoFsChecksum,
fs.getFileStatus(path).getLen(), closeIStream, cacheConf,
preferredEncodingInCache, hfs);
FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
cacheConf, preferredEncodingInCache, stream.getHfs());
}
/**
* @param fs A file system
* @param path Path to HFile
* @param fsdis an open checksummed stream of path's file
* @param fsdisNoFsChecksum an open unchecksummed stream of path's file
* @param fsdis a stream of path's file
* @param size max size of the trailer.
* @param cacheConf Cache configuration for hfile's contents
* @param preferredEncodingInCache Preferred in-cache data encoding algorithm.
* @param closeIStream boolean for closing file after the getting the reader version.
* @return A version specific Hfile Reader
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
*/
public static Reader createReaderWithEncoding(
FileSystem fs, Path path, FSDataInputStream fsdis,
FSDataInputStream fsdisNoFsChecksum, long size, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache, boolean closeIStream)
throws IOException {
public static Reader createReaderWithEncoding(FileSystem fs, Path path,
FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache) throws IOException {
HFileSystem hfs = null;
// If the fs is not an instance of HFileSystem, then create an
@ -634,9 +613,7 @@ public class HFile {
} else {
hfs = (HFileSystem)fs;
}
return pickReaderVersion(path, fsdis, fsdisNoFsChecksum, size,
closeIStream, cacheConf,
preferredEncodingInCache, hfs);
return pickReaderVersion(path, fsdis, size, cacheConf, preferredEncodingInCache, hfs);
}
/**
@ -660,9 +637,8 @@ public class HFile {
static Reader createReaderFromStream(Path path,
FSDataInputStream fsdis, long size, CacheConfig cacheConf)
throws IOException {
final boolean closeIStream = false;
return pickReaderVersion(path, fsdis, fsdis, size, closeIStream, cacheConf,
DataBlockEncoding.NONE, null);
FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
return pickReaderVersion(path, wrapper, size, cacheConf, DataBlockEncoding.NONE, null);
}
/**

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
@ -1122,6 +1123,9 @@ public class HFileBlock implements Cacheable {
* @return an iterator of blocks between the two given offsets
*/
BlockIterator blockRange(long startOffset, long endOffset);
/** Closes the backing streams */
void closeStreams() throws IOException;
}
/**
@ -1129,15 +1133,6 @@ public class HFileBlock implements Cacheable {
* tools for implementing HFile format version-specific block readers.
*/
private abstract static class AbstractFSReader implements FSReader {
/** The file system stream of the underlying {@link HFile} that
* does checksum validations in the filesystem */
protected final FSDataInputStream istream;
/** The file system stream of the underlying {@link HFile} that
* does not do checksum verification in the file system */
protected final FSDataInputStream istreamNoFsChecksum;
/** Compression algorithm used by the {@link HFile} */
protected Compression.Algorithm compressAlgo;
@ -1161,19 +1156,14 @@ public class HFileBlock implements Cacheable {
/** The default buffer size for our buffered streams */
public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
public AbstractFSReader(FSDataInputStream istream,
FSDataInputStream istreamNoFsChecksum,
Algorithm compressAlgo,
long fileSize, int minorVersion, HFileSystem hfs, Path path)
throws IOException {
this.istream = istream;
public AbstractFSReader(Algorithm compressAlgo, long fileSize, int minorVersion,
HFileSystem hfs, Path path) throws IOException {
this.compressAlgo = compressAlgo;
this.fileSize = fileSize;
this.minorVersion = minorVersion;
this.hfs = hfs;
this.path = path;
this.hdrSize = headerSize(minorVersion);
this.istreamNoFsChecksum = istreamNoFsChecksum;
}
@Override
@ -1276,22 +1266,6 @@ public class HFileBlock implements Cacheable {
hdrSize;
}
/**
* Creates a buffered stream reading a certain slice of the file system
* input stream. We need this because the decompression we use seems to
* expect the input stream to be bounded.
*
* @param offset the starting file offset the bounded stream reads from
* @param size the size of the segment of the file the stream should read
* @param pread whether to use position reads
* @return a stream restricted to the given portion of the file
*/
protected InputStream createBufferedBoundedStream(long offset,
int size, boolean pread) {
return new BufferedInputStream(new BoundedRangeFileInputStream(istream,
offset, size, pread), Math.min(DEFAULT_BUFFER_SIZE, size));
}
/**
* @return The minorVersion of this HFile
*/
@ -1312,19 +1286,9 @@ public class HFileBlock implements Cacheable {
/** Reads version 2 blocks from the filesystem. */
static class FSReaderV2 extends AbstractFSReader {
// The configuration states that we should validate hbase checksums
private final boolean useHBaseChecksumConfigured;
// Record the current state of this reader with respect to
// validating checkums in HBase. This is originally set the same
// value as useHBaseChecksumConfigured, but can change state as and when
// we encounter checksum verification failures.
private volatile boolean useHBaseChecksum;
// In the case of a checksum failure, do these many succeeding
// reads without hbase checksum verification.
private volatile int checksumOffCount = -1;
/** The file system stream of the underlying {@link HFile} that
* does or doesn't do checksum validations in the filesystem */
protected FSDataInputStreamWrapper streamWrapper;
/** Whether we include memstore timestamp in data blocks */
protected boolean includesMemstoreTS;
@ -1345,30 +1309,14 @@ public class HFileBlock implements Cacheable {
}
};
public FSReaderV2(FSDataInputStream istream,
FSDataInputStream istreamNoFsChecksum, Algorithm compressAlgo,
long fileSize, int minorVersion, HFileSystem hfs, Path path)
throws IOException {
super(istream, istreamNoFsChecksum, compressAlgo, fileSize,
minorVersion, hfs, path);
public FSReaderV2(FSDataInputStreamWrapper stream, Algorithm compressAlgo, long fileSize,
int minorVersion, HFileSystem hfs, Path path) throws IOException {
super(compressAlgo, fileSize, minorVersion, hfs, path);
this.streamWrapper = stream;
// Older versions of HBase didn't support checksum.
boolean forceNoHBaseChecksum = (this.getMinorVersion() < MINOR_VERSION_WITH_CHECKSUM);
this.streamWrapper.prepareForBlockReader(forceNoHBaseChecksum);
if (hfs != null) {
// Check the configuration to determine whether hbase-level
// checksum verification is needed or not.
useHBaseChecksum = hfs.useHBaseChecksum();
} else {
// The configuration does not specify anything about hbase checksum
// validations. Set it to true here assuming that we will verify
// hbase checksums for all reads. For older files that do not have
// stored checksums, this flag will be reset later.
useHBaseChecksum = true;
}
// for older versions, hbase did not store checksums.
if (getMinorVersion() < MINOR_VERSION_WITH_CHECKSUM) {
useHBaseChecksum = false;
}
this.useHBaseChecksumConfigured = useHBaseChecksum;
defaultDecodingCtx =
new HFileBlockDefaultDecodingContext(compressAlgo);
encodedBlockDecodingCtx =
@ -1381,7 +1329,7 @@ public class HFileBlock implements Cacheable {
*/
FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
long fileSize) throws IOException {
this(istream, istream, compressAlgo, fileSize,
this(new FSDataInputStreamWrapper(istream), compressAlgo, fileSize,
HFileReaderV2.MAX_MINOR_VERSION, null, null);
}
@ -1400,20 +1348,14 @@ public class HFileBlock implements Cacheable {
public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
int uncompressedSize, boolean pread) throws IOException {
// It is ok to get a reference to the stream here without any
// locks because it is marked final.
FSDataInputStream is = this.istreamNoFsChecksum;
// get a copy of the current state of whether to validate
// hbase checksums or not for this read call. This is not
// thread-safe but the one constaint is that if we decide
// to skip hbase checksum verification then we are
// guaranteed to use hdfs checksum verification.
boolean doVerificationThruHBaseChecksum = this.useHBaseChecksum;
if (!doVerificationThruHBaseChecksum) {
is = this.istream;
}
boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
HFileBlock blk = readBlockDataInternal(is, offset,
onDiskSizeWithHeaderL,
uncompressedSize, pread,
@ -1434,17 +1376,15 @@ public class HFileBlock implements Cacheable {
throw new IOException(msg); // cannot happen case here
}
HFile.checksumFailures.incrementAndGet(); // update metrics
// If we have a checksum failure, we fall back into a mode where
// the next few reads use HDFS level checksums. We aim to make the
// next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
// hbase checksum verification, but since this value is set without
// holding any locks, it can so happen that we might actually do
// a few more than precisely this number.
this.checksumOffCount = CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD;
this.useHBaseChecksum = false;
is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
doVerificationThruHBaseChecksum = false;
is = this.istream;
blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL,
uncompressedSize, pread,
doVerificationThruHBaseChecksum);
@ -1469,11 +1409,7 @@ public class HFileBlock implements Cacheable {
// The decrementing of this.checksumOffCount is not thread-safe,
// but it is harmless because eventually checksumOffCount will be
// a negative number.
if (!this.useHBaseChecksum && this.useHBaseChecksumConfigured) {
if (this.checksumOffCount-- < 0) {
this.useHBaseChecksum = true; // auto re-enable hbase checksums
}
}
streamWrapper.checksumOk();
return blk;
}
@ -1678,6 +1614,11 @@ public class HFileBlock implements Cacheable {
return ChecksumUtil.validateBlockChecksum(path, block,
data, hdrSize);
}
@Override
public void closeStreams() throws IOException {
streamWrapper.close();
}
}
@Override

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.io.WritableUtils;
@ -60,6 +61,9 @@ public class HFileReaderV2 extends AbstractHFileReader {
return includesMemstoreTS;
}
/** Filesystem-level block reader. */
private HFileBlock.FSReader fsBlockReader;
/**
* A "sparse lock" implementation allowing to lock on a particular block
* identified by offset. The purpose of this is to avoid two clients loading
@ -91,27 +95,21 @@ public class HFileReaderV2 extends AbstractHFileReader {
*
* @param path Path to HFile.
* @param trailer File trailer.
* @param fsdis input stream. Caller is responsible for closing the passed
* stream.
* @param fsdis input stream.
* @param size Length of the stream.
* @param closeIStream Whether to close the stream.
* @param cacheConf Cache configuration.
* @param preferredEncodingInCache the encoding to use in cache in case we
* have a choice. If the file is already encoded on disk, we will
* still use its on-disk encoding in cache.
*/
public HFileReaderV2(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final FSDataInputStream fsdisNoFsChecksum,
final long size,
final boolean closeIStream, final CacheConfig cacheConf,
final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache, final HFileSystem hfs)
throws IOException {
super(path, trailer, fsdis, fsdisNoFsChecksum, size,
closeIStream, cacheConf, hfs);
super(path, trailer, size, cacheConf, hfs);
trailer.expectMajorVersion(2);
validateMinorVersion(path, trailer.getMinorVersion());
HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis,
fsdisNoFsChecksum,
compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path);
this.fsBlockReader = fsBlockReaderV2; // upcast
@ -420,18 +418,16 @@ public class HFileReaderV2 extends AbstractHFileReader {
+ " block(s)");
}
}
if (closeIStream) {
if (istream != istreamNoFsChecksum && istreamNoFsChecksum != null) {
istreamNoFsChecksum.close();
istreamNoFsChecksum = null;
}
if (istream != null) {
istream.close();
istream = null;
}
}
fsBlockReader.closeStreams();
}
/** For testing */
@Override
HFileBlock.FSReader getUncachedBlockReader() {
return fsBlockReader;
}
protected abstract static class AbstractScannerV2
extends AbstractHFileReader.Scanner {
protected HFileBlock block;

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValue.MetaKeyComparator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockType;
@ -1069,12 +1070,10 @@ public class StoreFile {
bloomFilterType = BloomType.NONE;
}
public Reader(FileSystem fs, Path path, FSDataInputStream in,
final FSDataInputStream inNoChecksum, long size,
CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache,
boolean closeIStream) throws IOException {
reader = HFile.createReaderWithEncoding(fs, path, in, inNoChecksum,
size, cacheConf, preferredEncodingInCache, closeIStream);
public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache) throws IOException {
reader = HFile.createReaderWithEncoding(
fs, path, in, size, cacheConf, preferredEncodingInCache);
bloomFilterType = BloomType.NONE;
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
@ -157,49 +158,30 @@ public class StoreFileInfo {
*/
public StoreFile.Reader open(final FileSystem fs, final CacheConfig cacheConf,
final DataBlockEncoding dataBlockEncoding) throws IOException {
FSDataInputStream inNoChecksum = null;
FileSystem noChecksumFs = null;
FSDataInputStream in;
FSDataInputStreamWrapper in;
FileStatus status;
if (fs instanceof HFileSystem) {
noChecksumFs = ((HFileSystem)fs).getNoChecksumFs();
}
if (this.reference != null) {
if (this.link != null) {
// HFileLink Reference
in = this.link.open(fs);
inNoChecksum = (noChecksumFs != null) ? this.link.open(noChecksumFs) : in;
status = this.link.getFileStatus(fs);
} else {
// HFile Reference
Path referencePath = getReferredToFile(this.getPath());
in = fs.open(referencePath);
inNoChecksum = (noChecksumFs != null) ? noChecksumFs.open(referencePath) : in;
status = fs.getFileStatus(referencePath);
}
hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(fs, reference, status);
return new HalfStoreFileReader(fs, this.getPath(), in, inNoChecksum, status.getLen(),
cacheConf, reference, dataBlockEncoding);
if (this.link != null) {
// HFileLink
in = new FSDataInputStreamWrapper(fs, this.link);
status = this.link.getFileStatus(fs);
} else if (this.reference != null) {
// HFile Reference
Path referencePath = getReferredToFile(this.getPath());
in = new FSDataInputStreamWrapper(fs, referencePath);
status = fs.getFileStatus(referencePath);
} else {
in = new FSDataInputStreamWrapper(fs, this.getPath());
status = fileStatus;
}
long length = status.getLen();
if (this.reference != null) {
hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(fs, reference, status);
return new HalfStoreFileReader(
fs, this.getPath(), in, length, cacheConf, reference, dataBlockEncoding);
} else {
if (this.link != null) {
// HFileLink
in = this.link.open(fs);
inNoChecksum = (noChecksumFs != null) ? link.open(noChecksumFs) : in;
status = this.link.getFileStatus(fs);
} else {
// HFile
status = fileStatus;
in = fs.open(this.getPath());
inNoChecksum = (noChecksumFs != null) ? noChecksumFs.open(this.getPath()) : in;
}
long length = status.getLen();
hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length);
return new StoreFile.Reader(fs, this.getPath(), in, inNoChecksum, length,
cacheConf, dataBlockEncoding, true);
return new StoreFile.Reader(fs, this.getPath(), in, length, cacheConf, dataBlockEncoding);
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.util.ChecksumType;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.*;
@ -102,7 +103,7 @@ public class TestChecksum {
assertEquals(true, hfs.useHBaseChecksum());
// Do a read that purposely introduces checksum verification failures.
FSDataInputStream is = fs.open(path);
FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
HFileBlock.FSReader hbr = new FSReaderV2Test(is, algo,
totalSize, HFile.MAX_FORMAT_VERSION, fs, path);
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
@ -145,7 +146,7 @@ public class TestChecksum {
// any retries within hbase.
HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
assertEquals(false, newfs.useHBaseChecksum());
is = newfs.open(path);
is = new FSDataInputStreamWrapper(newfs, path);
hbr = new FSReaderV2Test(is, algo,
totalSize, HFile.MAX_FORMAT_VERSION, newfs, path);
b = hbr.readBlockData(0, -1, -1, pread);
@ -210,8 +211,8 @@ public class TestChecksum {
// Read data back from file.
FSDataInputStream is = fs.open(path);
FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, nochecksum,
algo, totalSize, HFile.MAX_FORMAT_VERSION, hfs, path);
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(
is, nochecksum), algo, totalSize, HFile.MAX_FORMAT_VERSION, hfs, path);
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
is.close();
b.sanityCheck();
@ -250,19 +251,15 @@ public class TestChecksum {
}
}
/**
* A class that introduces hbase-checksum failures while
* reading data from hfiles. This should trigger the hdfs level
* checksum validations.
*/
static private class FSReaderV2Test extends HFileBlock.FSReaderV2 {
FSReaderV2Test(FSDataInputStream istream, Algorithm algo,
long fileSize, int minorVersion, FileSystem fs,
Path path) throws IOException {
super(istream, istream, algo, fileSize, minorVersion,
(HFileSystem)fs, path);
public FSReaderV2Test(FSDataInputStreamWrapper istream, Algorithm algo, long fileSize,
int minorVersion, FileSystem fs,Path path) throws IOException {
super(istream, algo, fileSize, minorVersion, (HFileSystem)fs, path);
}
@Override

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.io.compress.Compressor;
@ -191,8 +192,8 @@ public class TestHFileBlockCompatibility {
os.close();
FSDataInputStream is = fs.open(path);
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, is, algo,
totalSize, MINOR_VERSION, fs, path);
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
algo, totalSize, MINOR_VERSION, fs, path);
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
is.close();
@ -204,8 +205,8 @@ public class TestHFileBlockCompatibility {
if (algo == GZ) {
is = fs.open(path);
hbr = new HFileBlock.FSReaderV2(is, is, algo, totalSize, MINOR_VERSION,
fs, path);
hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
algo, totalSize, MINOR_VERSION, fs, path);
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
b.totalChecksumBytes(), -1, pread);
assertEquals(blockStr, b.toString());
@ -265,8 +266,8 @@ public class TestHFileBlockCompatibility {
os.close();
FSDataInputStream is = fs.open(path);
HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, is, algo,
totalSize, MINOR_VERSION, fs, path);
HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is),
algo, totalSize, MINOR_VERSION, fs, path);
hbr.setDataBlockEncoder(dataBlockEncoder);
hbr.setIncludesMemstoreTS(includesMemstoreTS);