[jira] [HBASE-5074] Support checksums in HBase block cache

Author: Dhruba

Summary:
HFile is enhanced to store a checksum for each block. HDFS checksum verification
is avoided while reading data into the block cache. On a checksum verification
failure, we retry the file system read request with hdfs checksums switched on
(thanks Todd).

I have a benchmark that shows that it reduces iops on the disk by about 40%. In
this experiment, the entire memory on the regionserver is allocated to the
regionserver's jvm and the OS buffer cache size is negligible. I also measured
negligible (<5%) additional cpu usage while using hbase-level checksums.

The salient points of this patch:

1. Each hfile's trailer used to have a 4 byte version number. I enhanced this so
that these 4 bytes can be interpreted as a (major version number, minor
version). Pre-existing hfiles have a minor version of 0. The new hfile format
has a minor version of 1 (thanks Mikhail). The hfile major version remains
unchanged at 2. The reason I did not introduce a new major version number is
because the code changes needed to store/read checksums do not differ much from
existing V2 writers/readers.

2. Introduced a HFileSystem object which is a encapsulates the FileSystem
objects needed to access data from hfiles and hlogs.  HDFS FileSystem objects
already had the ability to switch off checksum verifications for reads.

3. The majority of the code changes are located in hbase.io.hfie package. The
retry of a read on an initial checksum failure occurs inside the hbase.io.hfile
package itself.  The code changes to hbase.regionserver package are minor.

4. The format of a hfileblock is the header followed by the data followed by the
checksum(s). Each 16 K (configurable) size of data has a 4 byte checksum.  The
hfileblock header has two additional fields: a 4 byte value to store the
bytesPerChecksum and a 4 byte value to store the size of the user data
(excluding the checksum data). This is well explained in the associated
javadocs.

5. I added a test to test backward compatibility. I will be writing more unit
tests that triggers checksum verification failures aggressively. I have left a
few redundant log messages in the code (just for easier debugging) and will
remove them in later stage of this patch. I will also be adding metrics on
number of checksum verification failures/success in a later version of this
diff.

6. By default, hbase-level checksums are switched on and hdfs level checksums
are switched off for hfile-reads. No changes to Hlog code path here.

Test Plan: The default setting is to switch on hbase checksums for hfile-reads,
thus all existing tests actually validate the new code pieces. I will be writing
more unit tests for triggering checksum verification failures.

Reviewers: mbautin

Reviewed By: mbautin

CC: JIRA, tedyu, mbautin, dhruba, todd, stack

Differential Revision: https://reviews.facebook.net/D1521

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1298641 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
mbautin 2012-03-08 22:55:49 +00:00
parent 17a4a3fd52
commit 961455cd9d
43 changed files with 3022 additions and 307 deletions

View File

@ -304,7 +304,7 @@ public final class HConstants {
/** The regioninfo column qualifier */
public static final byte [] REGIONINFO_QUALIFIER =
Bytes.toBytes(REGIONINFO_QUALIFIER_STR);
Bytes.toBytes(REGIONINFO_QUALIFIER_STR);
/** The server column qualifier */
public static final byte [] SERVER_QUALIFIER = Bytes.toBytes("server");
@ -610,6 +610,35 @@ public final class HConstants {
/** Host name of the local machine */
public static final String LOCALHOST = "localhost";
/**
* If this parameter is set to true, then hbase will read
* data and then verify checksums. Checksum verification
* inside hdfs will be switched off. However, if the hbase-checksum
* verification fails, then it will switch back to using
* hdfs checksums for verifiying data that is being read from storage.
*
* If this parameter is set to false, then hbase will not
* verify any checksums, instead it will depend on checksum verification
* being done in the hdfs client.
*/
public static final String HBASE_CHECKSUM_VERIFICATION =
"hbase.regionserver.checksum.verify";
/**
* The name of the configuration parameter that specifies
* the number of bytes in a newly created checksum chunk.
*/
public static final String BYTES_PER_CHECKSUM =
"hbase.hstore.bytes.per.checksum";
/**
* The name of the configuration parameter that specifies
* the name of an algorithm that is used to compute checksums
* for newly created blocks.
*/
public static final String CHECKSUM_TYPE_NAME =
"hbase.hstore.checksum.algorithm";
private HConstants() {
// Can't be instantiated with this ctor.
}

View File

@ -0,0 +1,177 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.fs;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Progressable;
/**
* An encapsulation for the FileSystem object that hbase uses to access
* data. This class allows the flexibility of using
* separate filesystem objects for reading and writing hfiles and hlogs.
* In future, if we want to make hlogs be in a different filesystem,
* this is the place to make it happen.
*/
public class HFileSystem extends FilterFileSystem {
private final FileSystem noChecksumFs; // read hfile data from storage
private final boolean useHBaseChecksum;
/**
* Create a FileSystem object for HBase regionservers.
* @param conf The configuration to be used for the filesystem
* @param useHBaseChecksums if true, then use
* checksum verfication in hbase, otherwise
* delegate checksum verification to the FileSystem.
*/
public HFileSystem(Configuration conf, boolean useHBaseChecksum)
throws IOException {
// Create the default filesystem with checksum verification switched on.
// By default, any operation to this FilterFileSystem occurs on
// the underlying filesystem that has checksums switched on.
this.fs = FileSystem.get(conf);
this.useHBaseChecksum = useHBaseChecksum;
fs.initialize(getDefaultUri(conf), conf);
// If hbase checksum verification is switched on, then create a new
// filesystem object that has cksum verification turned off.
// We will avoid verifying checksums in the fs client, instead do it
// inside of hbase.
if (useHBaseChecksum) {
this.noChecksumFs = newInstanceFileSystem(conf);
this.noChecksumFs.setVerifyChecksum(false);
} else {
this.noChecksumFs = fs;
}
}
/**
* Wrap a FileSystem object within a HFileSystem. The noChecksumFs and
* writefs are both set to be the same specified fs.
* Do not verify hbase-checksums while reading data from filesystem.
* @param fs Set the noChecksumFs and writeFs to this specified filesystem.
*/
public HFileSystem(FileSystem fs) {
this.fs = fs;
this.noChecksumFs = fs;
this.useHBaseChecksum = false;
}
/**
* Returns the filesystem that is specially setup for
* doing reads from storage. This object avoids doing
* checksum verifications for reads.
* @return The FileSystem object that can be used to read data
* from files.
*/
public FileSystem getNoChecksumFs() {
return noChecksumFs;
}
/**
* Returns the underlying filesystem
* @return The underlying FileSystem for this FilterFileSystem object.
*/
public FileSystem getBackingFs() throws IOException {
return fs;
}
/**
* Are we verifying checksums in HBase?
* @return True, if hbase is configured to verify checksums,
* otherwise false.
*/
public boolean useHBaseChecksum() {
return useHBaseChecksum;
}
/**
* Close this filesystem object
*/
@Override
public void close() throws IOException {
super.close();
if (this.noChecksumFs != fs) {
this.noChecksumFs.close();
}
}
/**
* Returns a brand new instance of the FileSystem. It does not use
* the FileSystem.Cache. In newer versions of HDFS, we can directly
* invoke FileSystem.newInstance(Configuration).
*
* @param conf Configuration
* @return A new instance of the filesystem
*/
private static FileSystem newInstanceFileSystem(Configuration conf)
throws IOException {
URI uri = FileSystem.getDefaultUri(conf);
Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + uri.getScheme());
}
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
return fs;
}
/**
* Create a new HFileSystem object, similar to FileSystem.get().
* This returns a filesystem object that avoids checksum
* verification in the filesystem for hfileblock-reads.
* For these blocks, checksum verification is done by HBase.
*/
static public FileSystem get(Configuration conf) throws IOException {
return new HFileSystem(conf, true);
}
/**
* Wrap a LocalFileSystem within a HFileSystem.
*/
static public FileSystem getLocalFs(Configuration conf) throws IOException {
return new HFileSystem(FileSystem.getLocal(conf));
}
/**
* The org.apache.hadoop.fs.FilterFileSystem does not yet support
* createNonRecursive. This is a hadoop bug and when it is fixed in Hadoop,
* this definition will go away.
*/
public FSDataOutputStream createNonRecursive(Path f,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return fs.createNonRecursive(f, overwrite, bufferSize, replication,
blockSize, progress);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
@ -42,9 +43,13 @@ public abstract class AbstractHFileReader extends SchemaConfigured
/** Filesystem-level block reader for this HFile format version. */
protected HFileBlock.FSReader fsBlockReader;
/** Stream to read from. */
/** Stream to read from. Does checksum verifications in file system */
protected FSDataInputStream istream;
/** The file system stream of the underlying {@link HFile} that
* does not do checksum verification in the file system */
protected FSDataInputStream istreamNoFsChecksum;
/**
* True if we should close the input stream when done. We don't close it if we
* didn't open it.
@ -99,10 +104,21 @@ public abstract class AbstractHFileReader extends SchemaConfigured
protected FileInfo fileInfo;
/** The filesystem used for accesing data */
protected HFileSystem hfs;
protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final long fileSize,
final boolean closeIStream,
final CacheConfig cacheConf) {
this(path, trailer, fsdis, fsdis, fileSize, closeIStream, cacheConf, null);
}
protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final FSDataInputStream fsdisNoFsChecksum,
final long fileSize,
final boolean closeIStream,
final CacheConfig cacheConf, final HFileSystem hfs) {
super(null, path);
this.trailer = trailer;
this.compressAlgo = trailer.getCompressionCodec();
@ -112,6 +128,8 @@ public abstract class AbstractHFileReader extends SchemaConfigured
this.closeIStream = closeIStream;
this.path = path;
this.name = path.getName();
this.hfs = hfs;
this.istreamNoFsChecksum = fsdisNoFsChecksum;
}
@SuppressWarnings("serial")
@ -343,5 +361,4 @@ public abstract class AbstractHFileReader extends SchemaConfigured
public DataBlockEncoding getEncodingOnDisk() {
return dataBlockEncoder.getEncodingOnDisk();
}
}

View File

@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.Checksum;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumFactory;
import org.apache.hadoop.hbase.util.ChecksumType;
/**
* Utility methods to compute and validate checksums.
*/
public class ChecksumUtil {
/** This is used to reserve space in a byte buffer */
private static byte[] DUMMY_VALUE = new byte[128 * HFileBlock.CHECKSUM_SIZE];
/**
* This is used by unit tests to make checksum failures throw an
* exception instead of returning null. Returning a null value from
* checksum validation will cause the higher layer to retry that
* read with hdfs-level checksums. Instead, we would like checksum
* failures to cause the entire unit test to fail.
*/
private static boolean generateExceptions = false;
/**
* Generates a checksum for all the data in indata. The checksum is
* written to outdata.
* @param indata input data stream
* @param startOffset starting offset in the indata stream from where to
* compute checkums from
* @param endOffset ending offset in the indata stream upto
* which checksums needs to be computed
* @param outData the output buffer where checksum values are written
* @param outOffset the starting offset in the outdata where the
* checksum values are written
* @param checksumType type of checksum
* @param bytesPerChecksum number of bytes per checksum value
*/
static void generateChecksums(byte[] indata,
int startOffset, int endOffset,
byte[] outdata, int outOffset,
ChecksumType checksumType,
int bytesPerChecksum) throws IOException {
if (checksumType == ChecksumType.NULL) {
return; // No checkums for this block.
}
Checksum checksum = checksumType.getChecksumObject();
int bytesLeft = endOffset - startOffset;
int chunkNum = 0;
while (bytesLeft > 0) {
// generate the checksum for one chunk
checksum.reset();
int count = Math.min(bytesLeft, bytesPerChecksum);
checksum.update(indata, startOffset, count);
// write the checksum value to the output buffer.
int cksumValue = (int)checksum.getValue();
outOffset = Bytes.putInt(outdata, outOffset, cksumValue);
chunkNum++;
startOffset += count;
bytesLeft -= count;
}
}
/**
* Validates that the data in the specified HFileBlock matches the
* checksum. Generates the checksum for the data and
* then validate that it matches the value stored in the header.
* If there is a checksum mismatch, then return false. Otherwise
* return true.
* The header is extracted from the specified HFileBlock while the
* data-to-be-verified is extracted from 'data'.
*/
static boolean validateBlockChecksum(Path path, HFileBlock block,
byte[] data, int hdrSize) throws IOException {
// If this is an older version of the block that does not have
// checksums, then return false indicating that checksum verification
// did not succeed. Actually, this methiod should never be called
// when the minorVersion is 0, thus this is a defensive check for a
// cannot-happen case. Since this is a cannot-happen case, it is
// better to return false to indicate a checksum validation failure.
if (block.getMinorVersion() < HFileBlock.MINOR_VERSION_WITH_CHECKSUM) {
return false;
}
// Get a checksum object based on the type of checksum that is
// set in the HFileBlock header. A ChecksumType.NULL indicates that
// the caller is not interested in validating checksums, so we
// always return true.
ChecksumType cktype = ChecksumType.codeToType(block.getChecksumType());
if (cktype == ChecksumType.NULL) {
return true; // No checkums validations needed for this block.
}
Checksum checksumObject = cktype.getChecksumObject();
checksumObject.reset();
// read in the stored value of the checksum size from the header.
int bytesPerChecksum = block.getBytesPerChecksum();
// bytesPerChecksum is always larger than the size of the header
if (bytesPerChecksum < hdrSize) {
String msg = "Unsupported value of bytesPerChecksum. " +
" Minimum is " + hdrSize +
" but the configured value is " + bytesPerChecksum;
HFile.LOG.warn(msg);
return false; // cannot happen case, unable to verify checksum
}
// Extract the header and compute checksum for the header.
ByteBuffer hdr = block.getBufferWithHeader();
checksumObject.update(hdr.array(), hdr.arrayOffset(), hdrSize);
int off = hdrSize;
int consumed = hdrSize;
int bytesLeft = block.getOnDiskDataSizeWithHeader() - off;
int cksumOffset = block.getOnDiskDataSizeWithHeader();
// validate each chunk
while (bytesLeft > 0) {
int thisChunkSize = bytesPerChecksum - consumed;
int count = Math.min(bytesLeft, thisChunkSize);
checksumObject.update(data, off, count);
int storedChecksum = Bytes.toInt(data, cksumOffset);
if (storedChecksum != (int)checksumObject.getValue()) {
String msg = "File " + path +
" Stored checksum value of " + storedChecksum +
" at offset " + cksumOffset +
" does not match computed checksum " +
checksumObject.getValue() +
", total data size " + data.length +
" Checksum data range offset " + off + " len " + count +
HFileBlock.toStringHeader(block.getBufferReadOnly());
HFile.LOG.warn(msg);
if (generateExceptions) {
throw new IOException(msg); // this is only for unit tests
} else {
return false; // checksum validation failure
}
}
cksumOffset += HFileBlock.CHECKSUM_SIZE;
bytesLeft -= count;
off += count;
consumed = 0;
checksumObject.reset();
}
return true; // checksum is valid
}
/**
* Returns the number of bytes needed to store the checksums for
* a specified data size
* @param datasize number of bytes of data
* @param bytesPerChecksum number of bytes in a checksum chunk
* @return The number of bytes needed to store the checksum values
*/
static long numBytes(long datasize, int bytesPerChecksum) {
return numChunks(datasize, bytesPerChecksum) *
HFileBlock.CHECKSUM_SIZE;
}
/**
* Returns the number of checksum chunks needed to store the checksums for
* a specified data size
* @param datasize number of bytes of data
* @param bytesPerChecksum number of bytes in a checksum chunk
* @return The number of checksum chunks
*/
static long numChunks(long datasize, int bytesPerChecksum) {
long numChunks = datasize/bytesPerChecksum;
if (datasize % bytesPerChecksum != 0) {
numChunks++;
}
return numChunks;
}
/**
* Write dummy checksums to the end of the specified bytes array
* to reserve space for writing checksums later
* @param baos OutputStream to write dummy checkum values
* @param numBytes Number of bytes of data for which dummy checksums
* need to be generated
* @param bytesPerChecksum Number of bytes per checksum value
*/
static void reserveSpaceForChecksums(ByteArrayOutputStream baos,
int numBytes, int bytesPerChecksum) throws IOException {
long numChunks = numChunks(numBytes, bytesPerChecksum);
long bytesLeft = numChunks * HFileBlock.CHECKSUM_SIZE;
while (bytesLeft > 0) {
long count = Math.min(bytesLeft, DUMMY_VALUE.length);
baos.write(DUMMY_VALUE, 0, (int)count);
bytesLeft -= count;
}
}
/**
* Mechanism to throw an exception in case of hbase checksum
* failure. This is used by unit tests only.
* @param value Setting this to true will cause hbase checksum
* verification failures to generate exceptions.
*/
public static void generateExceptionForChecksumFailureForTest(boolean value) {
generateExceptions = value;
}
}

View File

@ -44,6 +44,13 @@ import com.google.common.io.NullOutputStream;
* variable parts of the file. Also includes basic metadata on this file. The
* trailer size is fixed within a given {@link HFile} format version only, but
* we always store the version number as the last four-byte integer of the file.
* The version number itself is split into two portions, a major
* version and a minor version.
* The last three bytes of a file is the major
* version and a single preceding byte is the minor number. The major version
* determines which readers/writers to use to read/write a hfile while a minor
* version determines smaller changes in hfile format that do not need a new
* reader/writer type.
*/
@InterfaceAudience.Private
public class FixedFileTrailer {
@ -108,12 +115,16 @@ public class FixedFileTrailer {
/** Raw key comparator class name in version 2 */
private String comparatorClassName = RawComparator.class.getName();
/** The {@link HFile} format version. */
private final int version;
/** The {@link HFile} format major version. */
private final int majorVersion;
FixedFileTrailer(int version) {
this.version = version;
HFile.checkFormatVersion(version);
/** The {@link HFile} format minor version. */
private final int minorVersion;
FixedFileTrailer(int majorVersion, int minorVersion) {
this.majorVersion = majorVersion;
this.minorVersion = minorVersion;
HFile.checkFormatVersion(majorVersion);
}
private static int[] computeTrailerSizeByVersion() {
@ -121,7 +132,8 @@ public class FixedFileTrailer {
for (int version = MIN_FORMAT_VERSION;
version <= MAX_FORMAT_VERSION;
++version) {
FixedFileTrailer fft = new FixedFileTrailer(version);
FixedFileTrailer fft = new FixedFileTrailer(version,
HFileBlock.MINOR_VERSION_NO_CHECKSUM);
DataOutputStream dos = new DataOutputStream(new NullOutputStream());
try {
fft.serialize(dos);
@ -151,7 +163,7 @@ public class FixedFileTrailer {
}
public int getTrailerSize() {
return getTrailerSize(version);
return getTrailerSize(majorVersion);
}
/**
@ -163,7 +175,7 @@ public class FixedFileTrailer {
* @throws IOException
*/
void serialize(DataOutputStream outputStream) throws IOException {
HFile.checkFormatVersion(version);
HFile.checkFormatVersion(majorVersion);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutput baosDos = new DataOutputStream(baos);
@ -173,7 +185,7 @@ public class FixedFileTrailer {
baosDos.writeLong(loadOnOpenDataOffset);
baosDos.writeInt(dataIndexCount);
if (version == 1) {
if (majorVersion == 1) {
// This used to be metaIndexOffset, but it was not used in version 1.
baosDos.writeLong(0);
} else {
@ -182,7 +194,7 @@ public class FixedFileTrailer {
baosDos.writeInt(metaIndexCount);
baosDos.writeLong(totalUncompressedBytes);
if (version == 1) {
if (majorVersion == 1) {
baosDos.writeInt((int) Math.min(Integer.MAX_VALUE, entryCount));
} else {
// This field is long from version 2 onwards.
@ -190,14 +202,16 @@ public class FixedFileTrailer {
}
baosDos.writeInt(compressionCodec.ordinal());
if (version > 1) {
if (majorVersion > 1) {
baosDos.writeInt(numDataIndexLevels);
baosDos.writeLong(firstDataBlockOffset);
baosDos.writeLong(lastDataBlockOffset);
Bytes.writeStringFixedSize(baosDos, comparatorClassName,
MAX_COMPARATOR_NAME_LENGTH);
}
baosDos.writeInt(version);
// serialize the major and minor versions
baosDos.writeInt(materializeVersion(majorVersion, minorVersion));
outputStream.write(baos.toByteArray());
}
@ -212,7 +226,7 @@ public class FixedFileTrailer {
* @throws IOException
*/
void deserialize(DataInputStream inputStream) throws IOException {
HFile.checkFormatVersion(version);
HFile.checkFormatVersion(majorVersion);
BlockType.TRAILER.readAndCheck(inputStream);
@ -220,7 +234,7 @@ public class FixedFileTrailer {
loadOnOpenDataOffset = inputStream.readLong();
dataIndexCount = inputStream.readInt();
if (version == 1) {
if (majorVersion == 1) {
inputStream.readLong(); // Read and skip metaIndexOffset.
} else {
uncompressedDataIndexSize = inputStream.readLong();
@ -228,9 +242,9 @@ public class FixedFileTrailer {
metaIndexCount = inputStream.readInt();
totalUncompressedBytes = inputStream.readLong();
entryCount = version == 1 ? inputStream.readInt() : inputStream.readLong();
entryCount = majorVersion == 1 ? inputStream.readInt() : inputStream.readLong();
compressionCodec = Compression.Algorithm.values()[inputStream.readInt()];
if (version > 1) {
if (majorVersion > 1) {
numDataIndexLevels = inputStream.readInt();
firstDataBlockOffset = inputStream.readLong();
lastDataBlockOffset = inputStream.readLong();
@ -238,7 +252,9 @@ public class FixedFileTrailer {
Bytes.readStringFixedSize(inputStream, MAX_COMPARATOR_NAME_LENGTH);
}
expectVersion(inputStream.readInt());
int version = inputStream.readInt();
expectMajorVersion(extractMajorVersion(version));
expectMinorVersion(extractMinorVersion(version));
}
private void append(StringBuilder sb, String s) {
@ -257,14 +273,15 @@ public class FixedFileTrailer {
append(sb, "totalUncomressedBytes=" + totalUncompressedBytes);
append(sb, "entryCount=" + entryCount);
append(sb, "compressionCodec=" + compressionCodec);
if (version == 2) {
if (majorVersion == 2) {
append(sb, "uncompressedDataIndexSize=" + uncompressedDataIndexSize);
append(sb, "numDataIndexLevels=" + numDataIndexLevels);
append(sb, "firstDataBlockOffset=" + firstDataBlockOffset);
append(sb, "lastDataBlockOffset=" + lastDataBlockOffset);
append(sb, "comparatorClassName=" + comparatorClassName);
}
append(sb, "version=" + version);
append(sb, "majorVersion=" + majorVersion);
append(sb, "minorVersion=" + minorVersion);
return sb.toString();
}
@ -301,31 +318,44 @@ public class FixedFileTrailer {
buf.position(buf.limit() - Bytes.SIZEOF_INT);
int version = buf.getInt();
// Extract the major and minor versions.
int majorVersion = extractMajorVersion(version);
int minorVersion = extractMinorVersion(version);
try {
HFile.checkFormatVersion(version);
HFile.checkFormatVersion(majorVersion);
} catch (IllegalArgumentException iae) {
// In this context, an invalid version might indicate a corrupt HFile.
throw new IOException(iae);
}
int trailerSize = getTrailerSize(version);
int trailerSize = getTrailerSize(majorVersion);
FixedFileTrailer fft = new FixedFileTrailer(version);
FixedFileTrailer fft = new FixedFileTrailer(majorVersion, minorVersion);
fft.deserialize(new DataInputStream(new ByteArrayInputStream(buf.array(),
buf.arrayOffset() + bufferSize - trailerSize, trailerSize)));
return fft;
}
public void expectVersion(int expected) {
if (version != expected) {
throw new IllegalArgumentException("Invalid HFile version: " + version
public void expectMajorVersion(int expected) {
if (majorVersion != expected) {
throw new IllegalArgumentException("Invalid HFile major version: "
+ majorVersion
+ " (expected: " + expected + ")");
}
}
public void expectAtLeastVersion(int lowerBound) {
if (version < lowerBound) {
throw new IllegalArgumentException("Invalid HFile version: " + version
public void expectMinorVersion(int expected) {
if (minorVersion != expected) {
throw new IllegalArgumentException("Invalid HFile minor version: "
+ minorVersion + " (expected: " + expected + ")");
}
}
public void expectAtLeastMajorVersion(int lowerBound) {
if (majorVersion < lowerBound) {
throw new IllegalArgumentException("Invalid HFile major version: "
+ majorVersion
+ " (expected: " + lowerBound + " or higher).");
}
}
@ -375,11 +405,11 @@ public class FixedFileTrailer {
}
public void setEntryCount(long newEntryCount) {
if (version == 1) {
if (majorVersion == 1) {
int intEntryCount = (int) Math.min(Integer.MAX_VALUE, newEntryCount);
if (intEntryCount != newEntryCount) {
LOG.info("Warning: entry count is " + newEntryCount + " but writing "
+ intEntryCount + " into the version " + version + " trailer");
+ intEntryCount + " into the version " + majorVersion + " trailer");
}
entryCount = intEntryCount;
return;
@ -396,42 +426,52 @@ public class FixedFileTrailer {
}
public int getNumDataIndexLevels() {
expectAtLeastVersion(2);
expectAtLeastMajorVersion(2);
return numDataIndexLevels;
}
public void setNumDataIndexLevels(int numDataIndexLevels) {
expectAtLeastVersion(2);
expectAtLeastMajorVersion(2);
this.numDataIndexLevels = numDataIndexLevels;
}
public long getLastDataBlockOffset() {
expectAtLeastVersion(2);
expectAtLeastMajorVersion(2);
return lastDataBlockOffset;
}
public void setLastDataBlockOffset(long lastDataBlockOffset) {
expectAtLeastVersion(2);
expectAtLeastMajorVersion(2);
this.lastDataBlockOffset = lastDataBlockOffset;
}
public long getFirstDataBlockOffset() {
expectAtLeastVersion(2);
expectAtLeastMajorVersion(2);
return firstDataBlockOffset;
}
public void setFirstDataBlockOffset(long firstDataBlockOffset) {
expectAtLeastVersion(2);
expectAtLeastMajorVersion(2);
this.firstDataBlockOffset = firstDataBlockOffset;
}
public int getVersion() {
return version;
/**
* Returns the major version of this HFile format
*/
public int getMajorVersion() {
return majorVersion;
}
/**
* Returns the minor version of this HFile format
*/
int getMinorVersion() {
return minorVersion;
}
@SuppressWarnings("rawtypes")
public void setComparatorClass(Class<? extends RawComparator> klass) {
expectAtLeastVersion(2);
expectAtLeastMajorVersion(2);
comparatorClassName = klass.getName();
}
@ -458,20 +498,43 @@ public class FixedFileTrailer {
}
RawComparator<byte[]> createComparator() throws IOException {
expectAtLeastVersion(2);
expectAtLeastMajorVersion(2);
return createComparator(comparatorClassName);
}
public long getUncompressedDataIndexSize() {
if (version == 1)
if (majorVersion == 1)
return 0;
return uncompressedDataIndexSize;
}
public void setUncompressedDataIndexSize(
long uncompressedDataIndexSize) {
expectAtLeastVersion(2);
expectAtLeastMajorVersion(2);
this.uncompressedDataIndexSize = uncompressedDataIndexSize;
}
/**
* Extracts the major version for a 4-byte serialized version data.
* The major version is the 3 least significant bytes
*/
private static int extractMajorVersion(int serializedVersion) {
return (serializedVersion & 0x00ffffff);
}
/**
* Extracts the minor version for a 4-byte serialized version data.
* The major version are the 3 the most significant bytes
*/
private static int extractMinorVersion(int serializedVersion) {
return (serializedVersion >>> 24);
}
/**
* Create a 4 byte serialized version number by combining the
* minor and major version numbers.
*/
private static int materializeVersion(int majorVersion, int minorVersion) {
return ((majorVersion & 0x00ffffff) | (minorVersion << 24));
}
}

View File

@ -43,10 +43,12 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.HbaseMapWritable;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -156,6 +158,12 @@ public class HFile {
*/
public final static int MIN_NUM_HFILE_PATH_LEVELS = 5;
/**
* The number of bytes per checksum.
*/
public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32;
// For measuring latency of "sequential" reads and writes
static final AtomicInteger readOps = new AtomicInteger();
static final AtomicLong readTimeNano = new AtomicLong();
@ -166,6 +174,9 @@ public class HFile {
static final AtomicInteger preadOps = new AtomicInteger();
static final AtomicLong preadTimeNano = new AtomicLong();
// For measuring number of checksum failures
static final AtomicLong checksumFailures = new AtomicLong();
// for test purpose
public static volatile AtomicLong dataBlockReadCnt = new AtomicLong(0);
@ -195,6 +206,14 @@ public class HFile {
return writeTimeNano.getAndSet(0) / 1000000;
}
/**
* Number of checksum verification failures. It also
* clears the counter.
*/
public static final long getChecksumFailuresCount() {
return checksumFailures.getAndSet(0);
}
/** API required to write an {@link HFile} */
public interface Writer extends Closeable {
@ -247,6 +266,8 @@ public class HFile {
HFile.DEFAULT_COMPRESSION_ALGORITHM;
protected HFileDataBlockEncoder encoder = NoOpDataBlockEncoder.INSTANCE;
protected KeyComparator comparator;
protected ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
protected int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM;
WriterFactory(Configuration conf, CacheConfig cacheConf) {
this.conf = conf;
@ -296,6 +317,17 @@ public class HFile {
return this;
}
public WriterFactory withChecksumType(ChecksumType checksumType) {
Preconditions.checkNotNull(checksumType);
this.checksumType = checksumType;
return this;
}
public WriterFactory withBytesPerChecksum(int bytesPerChecksum) {
this.bytesPerChecksum = bytesPerChecksum;
return this;
}
public Writer create() throws IOException {
if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
throw new AssertionError("Please specify exactly one of " +
@ -305,14 +337,15 @@ public class HFile {
ostream = AbstractHFileWriter.createOutputStream(conf, fs, path);
}
return createWriter(fs, path, ostream, blockSize,
compression, encoder, comparator);
compression, encoder, comparator, checksumType, bytesPerChecksum);
}
protected abstract Writer createWriter(FileSystem fs, Path path,
FSDataOutputStream ostream, int blockSize,
Compression.Algorithm compress,
HFileDataBlockEncoder dataBlockEncoder,
KeyComparator comparator) throws IOException;
KeyComparator comparator, ChecksumType checksumType,
int bytesPerChecksum) throws IOException;
}
/** The configuration key for HFile version to use for new files */
@ -431,20 +464,22 @@ public class HFile {
}
private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,
FSDataInputStream fsdisNoFsChecksum,
long size, boolean closeIStream, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache)
DataBlockEncoding preferredEncodingInCache, HFileSystem hfs)
throws IOException {
FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size);
switch (trailer.getVersion()) {
switch (trailer.getMajorVersion()) {
case 1:
return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
cacheConf);
case 2:
return new HFileReaderV2(path, trailer, fsdis, size, closeIStream,
cacheConf, preferredEncodingInCache);
return new HFileReaderV2(path, trailer, fsdis, fsdisNoFsChecksum,
size, closeIStream,
cacheConf, preferredEncodingInCache, hfs);
default:
throw new IOException("Cannot instantiate reader for HFile version " +
trailer.getVersion());
trailer.getMajorVersion());
}
}
@ -452,9 +487,26 @@ public class HFile {
FileSystem fs, Path path, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache) throws IOException {
final boolean closeIStream = true;
return pickReaderVersion(path, fs.open(path),
HFileSystem hfs = null;
FSDataInputStream fsdis = fs.open(path);
FSDataInputStream fsdisNoFsChecksum = fsdis;
// If the fs is not an instance of HFileSystem, then create an
// instance of HFileSystem that wraps over the specified fs.
// In this case, we will not be able to avoid checksumming inside
// the filesystem.
if (!(fs instanceof HFileSystem)) {
hfs = new HFileSystem(fs);
} else {
hfs = (HFileSystem)fs;
// open a stream to read data without checksum verification in
// the filesystem
if (hfs != null) {
fsdisNoFsChecksum = hfs.getNoChecksumFs().open(path);
}
}
return pickReaderVersion(path, fsdis, fsdisNoFsChecksum,
fs.getFileStatus(path).getLen(), closeIStream, cacheConf,
preferredEncodingInCache);
preferredEncodingInCache, hfs);
}
public static Reader createReader(
@ -463,12 +515,15 @@ public class HFile {
DataBlockEncoding.NONE);
}
public static Reader createReaderFromStream(Path path,
/**
* This factory method is used only by unit tests
*/
static Reader createReaderFromStream(Path path,
FSDataInputStream fsdis, long size, CacheConfig cacheConf)
throws IOException {
final boolean closeIStream = false;
return pickReaderVersion(path, fsdis, size, closeIStream, cacheConf,
DataBlockEncoding.NONE);
return pickReaderVersion(path, fsdis, fsdis, size, closeIStream, cacheConf,
DataBlockEncoding.NONE, null);
}
/*

View File

@ -51,11 +51,12 @@ public interface HFileDataBlockEncoder {
* Should be called before an encoded or unencoded data block is written to
* disk.
* @param in KeyValues next to each other
* @param dummyHeader A dummy header to be written as a placeholder
* @return a non-null on-heap buffer containing the contents of the
* HFileBlock with unfilled header and block type
*/
public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
ByteBuffer in, boolean includesMemstoreTS);
ByteBuffer in, boolean includesMemstoreTS, byte[] dummyHeader);
/**
* Decides whether we should use a scanner over encoded blocks.

View File

@ -154,14 +154,14 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
*/
@Override
public Pair<ByteBuffer, BlockType> beforeWriteToDisk(ByteBuffer in,
boolean includesMemstoreTS) {
boolean includesMemstoreTS, byte[] dummyHeader) {
if (onDisk == DataBlockEncoding.NONE) {
// there is no need to encode the block before writing it to disk
return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
}
ByteBuffer encodedBuffer = encodeBufferToHFileBlockBuffer(in,
onDisk, includesMemstoreTS);
onDisk, includesMemstoreTS, dummyHeader);
return new Pair<ByteBuffer, BlockType>(encodedBuffer,
BlockType.ENCODED_DATA);
}
@ -175,12 +175,13 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
}
private ByteBuffer encodeBufferToHFileBlockBuffer(ByteBuffer in,
DataBlockEncoding algo, boolean includesMemstoreTS) {
DataBlockEncoding algo, boolean includesMemstoreTS,
byte[] dummyHeader) {
ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(encodedStream);
DataBlockEncoder encoder = algo.getEncoder();
try {
encodedStream.write(HFileBlock.DUMMY_HEADER);
encodedStream.write(dummyHeader);
algo.writeIdInBytes(dataOut);
encoder.compressKeyValues(dataOut, in,
includesMemstoreTS);
@ -194,13 +195,16 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
private HFileBlock encodeDataBlock(HFileBlock block,
DataBlockEncoding algo, boolean includesMemstoreTS) {
ByteBuffer compressedBuffer = encodeBufferToHFileBlockBuffer(
block.getBufferWithoutHeader(), algo, includesMemstoreTS);
int sizeWithoutHeader = compressedBuffer.limit() - HFileBlock.HEADER_SIZE;
block.getBufferWithoutHeader(), algo, includesMemstoreTS,
HFileBlock.DUMMY_HEADER);
int sizeWithoutHeader = compressedBuffer.limit() - block.headerSize();
HFileBlock encodedBlock = new HFileBlock(BlockType.ENCODED_DATA,
block.getOnDiskSizeWithoutHeader(),
sizeWithoutHeader, block.getPrevBlockOffset(),
compressedBuffer, HFileBlock.FILL_HEADER, block.getOffset(),
includesMemstoreTS);
includesMemstoreTS, block.getMinorVersion(),
block.getBytesPerChecksum(), block.getChecksumType(),
block.getOnDiskDataSizeWithHeader());
block.passSchemaMetricsTo(encodedBlock);
return encodedBlock;
}

View File

@ -65,10 +65,10 @@ public class HFileReaderV1 extends AbstractHFileReader {
public HFileReaderV1(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final long size,
final boolean closeIStream,
final CacheConfig cacheConf) {
final CacheConfig cacheConf) throws IOException {
super(path, trailer, fsdis, size, closeIStream, cacheConf);
trailer.expectVersion(1);
trailer.expectMajorVersion(1);
fsBlockReader = new HFileBlock.FSReaderV1(fsdis, compressAlgo, fileSize);
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
@ -73,6 +74,12 @@ public class HFileReaderV2 extends AbstractHFileReader {
*/
private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
/** Minimum minor version supported by this HFile format */
static final int MIN_MINOR_VERSION = 0;
/** Maximum minor version supported by this HFile format */
static final int MAX_MINOR_VERSION = 1;
/**
* Opens a HFile. You must load the index before you can use it by calling
* {@link #loadFileInfo()}.
@ -89,14 +96,18 @@ public class HFileReaderV2 extends AbstractHFileReader {
* still use its on-disk encoding in cache.
*/
public HFileReaderV2(Path path, FixedFileTrailer trailer,
final FSDataInputStream fsdis, final long size,
final FSDataInputStream fsdis, final FSDataInputStream fsdisNoFsChecksum,
final long size,
final boolean closeIStream, final CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache)
DataBlockEncoding preferredEncodingInCache, final HFileSystem hfs)
throws IOException {
super(path, trailer, fsdis, size, closeIStream, cacheConf);
trailer.expectVersion(2);
super(path, trailer, fsdis, fsdisNoFsChecksum, size,
closeIStream, cacheConf, hfs);
trailer.expectMajorVersion(2);
validateMinorVersion(path, trailer.getMinorVersion());
HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis,
compressAlgo, fileSize);
fsdisNoFsChecksum,
compressAlgo, fileSize, trailer.getMinorVersion(), hfs, path);
this.fsBlockReader = fsBlockReaderV2; // upcast
// Comparator class name is stored in the trailer in version 2.
@ -411,9 +422,15 @@ public class HFileReaderV2 extends AbstractHFileReader {
+ " block(s)");
}
}
if (closeIStream && istream != null) {
istream.close();
istream = null;
if (closeIStream) {
if (istream != istreamNoFsChecksum && istreamNoFsChecksum != null) {
istreamNoFsChecksum.close();
istreamNoFsChecksum = null;
}
if (istream != null) {
istream.close();
istream = null;
}
}
}
@ -915,9 +932,9 @@ public class HFileReaderV2 extends AbstractHFileReader {
private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
ByteBuffer origBlock = newBlock.getBufferReadOnly();
ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
origBlock.arrayOffset() + HFileBlock.HEADER_SIZE +
origBlock.arrayOffset() + newBlock.headerSize() +
DataBlockEncoding.ID_SIZE,
origBlock.limit() - HFileBlock.HEADER_SIZE -
newBlock.getUncompressedSizeWithoutHeader() -
DataBlockEncoding.ID_SIZE).slice();
return encodedBlock;
}
@ -1053,4 +1070,19 @@ public class HFileReaderV2 extends AbstractHFileReader {
return true; // We load file info in constructor in version 2.
}
/**
* Validates that the minor version is within acceptable limits.
* Otherwise throws an Runtime exception
*/
private void validateMinorVersion(Path path, int minorVersion) {
if (minorVersion < MIN_MINOR_VERSION ||
minorVersion > MAX_MINOR_VERSION) {
String msg = "Minor version for path " + path +
" is expected to be between " +
MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
" but is found to be " + minorVersion;
LOG.error(msg);
throw new RuntimeException(msg);
}
}
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
@ -92,8 +93,9 @@ public class HFileWriterV1 extends AbstractHFileWriter {
public Writer createWriter(FileSystem fs, Path path,
FSDataOutputStream ostream, int blockSize,
Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder,
KeyComparator comparator)
throws IOException {
KeyComparator comparator, final ChecksumType checksumType,
final int bytesPerChecksum) throws IOException {
// version 1 does not implement checksums
return new HFileWriterV1(conf, cacheConf, fs, path, ostream, blockSize,
compressAlgo, dataBlockEncoder, comparator);
}
@ -149,7 +151,13 @@ public class HFileWriterV1 extends AbstractHFileWriter {
HFileBlock block = new HFileBlock(BlockType.DATA,
(int) (outputStream.getPos() - blockBegin), bytes.length, -1,
ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER,
blockBegin, MemStore.NO_PERSISTENT_TS);
blockBegin, MemStore.NO_PERSISTENT_TS,
HFileBlock.MINOR_VERSION_NO_CHECKSUM, // minor version
0, // bytesPerChecksum
ChecksumType.NULL.getCode(), // checksum type
(int) (outputStream.getPos() - blockBegin) +
HFileBlock.HEADER_SIZE_NO_CHECKSUM); // onDiskDataSizeWithHeader
block = blockEncoder.diskToCacheFormat(block, false);
passSchemaMetricsTo(block);
cacheConf.getBlockCache().cacheBlock(
@ -174,7 +182,7 @@ public class HFileWriterV1 extends AbstractHFileWriter {
if (cacheConf.shouldCacheDataOnWrite()) {
this.baos = new ByteArrayOutputStream();
this.baosDos = new DataOutputStream(baos);
baosDos.write(HFileBlock.DUMMY_HEADER);
baosDos.write(HFileBlock.DUMMY_HEADER_NO_CHECKSUM);
}
}
@ -332,7 +340,8 @@ public class HFileWriterV1 extends AbstractHFileWriter {
finishBlock();
FixedFileTrailer trailer = new FixedFileTrailer(1);
FixedFileTrailer trailer = new FixedFileTrailer(1,
HFileBlock.MINOR_VERSION_NO_CHECKSUM);
// Write out the metadata blocks if any.
ArrayList<Long> metaOffsets = null;

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
@ -81,6 +82,10 @@ public class HFileWriterV2 extends AbstractHFileWriter {
private List<BlockWritable> additionalLoadOnOpenData =
new ArrayList<BlockWritable>();
/** Checksum related settings */
private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM;
private final boolean includeMemstoreTS = true;
private long maxMemstoreTS = 0;
@ -93,9 +98,10 @@ public class HFileWriterV2 extends AbstractHFileWriter {
public Writer createWriter(FileSystem fs, Path path,
FSDataOutputStream ostream, int blockSize,
Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder,
final KeyComparator comparator) throws IOException {
final KeyComparator comparator, final ChecksumType checksumType,
final int bytesPerChecksum) throws IOException {
return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize,
compress, blockEncoder, comparator);
compress, blockEncoder, comparator, checksumType, bytesPerChecksum);
}
}
@ -103,11 +109,14 @@ public class HFileWriterV2 extends AbstractHFileWriter {
public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
FileSystem fs, Path path, FSDataOutputStream ostream, int blockSize,
Compression.Algorithm compressAlgo, HFileDataBlockEncoder blockEncoder,
final KeyComparator comparator) throws IOException {
final KeyComparator comparator, final ChecksumType checksumType,
final int bytesPerChecksum) throws IOException {
super(cacheConf,
ostream == null ? createOutputStream(conf, fs, path) : ostream,
path, blockSize, compressAlgo, blockEncoder, comparator);
SchemaMetrics.configureGlobally(conf);
this.checksumType = checksumType;
this.bytesPerChecksum = bytesPerChecksum;
finishInit(conf);
}
@ -118,7 +127,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
// HFile filesystem-level (non-caching) block writer
fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder,
includeMemstoreTS);
includeMemstoreTS, checksumType, bytesPerChecksum);
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
@ -356,7 +365,8 @@ public class HFileWriterV2 extends AbstractHFileWriter {
finishBlock();
writeInlineBlocks(true);
FixedFileTrailer trailer = new FixedFileTrailer(2);
FixedFileTrailer trailer = new FixedFileTrailer(2,
HFileReaderV2.MAX_MINOR_VERSION);
// Write out the metadata blocks if any.
if (!metaNames.isEmpty()) {

View File

@ -46,7 +46,7 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
@Override
public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
ByteBuffer in, boolean includesMemstoreTS) {
ByteBuffer in, boolean includesMemstoreTS, byte[] dummyHeader) {
return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
}

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
@ -553,6 +554,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
.withCompression(compression)
.withDataBlockEncoder(dataBlockEncoder)
.withBloomType(bloomFilterType)
.withChecksumType(Store.getChecksumType(conf))
.withBytesPerChecksum(Store.getBytesPerChecksum(conf))
.build();
HFileScanner scanner = halfReader.getScanner(false, false, false);
scanner.seekTo();

View File

@ -3624,7 +3624,14 @@ public class HRegion implements HeapSize { // , Writable{
}
Path dir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf),
info.getTableName());
HRegion r = HRegion.newHRegion(dir, wal, FileSystem.get(conf), conf, info,
FileSystem fs = null;
if (rsServices != null) {
fs = rsServices.getFileSystem();
}
if (fs == null) {
fs = FileSystem.get(conf);
}
HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info,
htd, rsServices);
return r.openHRegion(reporter);
}

View File

@ -106,6 +106,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -195,7 +196,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
protected final Configuration conf;
protected final AtomicBoolean haveRootRegion = new AtomicBoolean(false);
private FileSystem fs;
private HFileSystem fs;
private boolean useHBaseChecksum; // verify hbase checksums?
private Path rootDir;
private final Random rand = new Random();
@ -368,6 +370,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
this.isOnline = false;
checkCodecs(this.conf);
// do we use checksum verfication in the hbase? If hbase checksum verification
// is enabled, then we automatically switch off hdfs checksum verification.
this.useHBaseChecksum = conf.getBoolean(
HConstants.HBASE_CHECKSUM_VERIFICATION, true);
// Config'ed params
this.numRetries = conf.getInt("hbase.client.retries.number", 10);
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
@ -978,7 +985,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// to defaults).
this.conf.set("fs.defaultFS", this.conf.get("hbase.rootdir"));
// Get fs instance used by this RS
this.fs = FileSystem.get(this.conf);
this.fs = new HFileSystem(this.conf, this.useHBaseChecksum);
this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
this.tableDescriptors = new FSTableDescriptors(this.fs, this.rootDir, true);
this.hlog = setupWALAndReplication();
@ -1278,7 +1285,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
* @throws IOException
*/
protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
return new HLog(this.fs, logdir, oldLogDir, this.conf,
return new HLog(this.fs.getBackingFs(), logdir, oldLogDir, this.conf,
getWALActionListeners(), this.serverNameFromMasterPOV.toString());
}
@ -3165,7 +3172,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
/**
* @return Return the fs.
*/
protected FileSystem getFileSystem() {
public FileSystem getFileSystem() {
return fs;
}

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -80,5 +81,9 @@ public interface RegionServerServices extends OnlineRegions {
* @return map of regions in transition in this RS
*/
public Map<byte[], Boolean> getRegionsInTransitionInRS();
/**
* @return Return the FileSystem object used by the regionserver
*/
public FileSystem getFileSystem();
}

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -157,6 +158,10 @@ public class Store extends SchemaConfigured implements HeapSize {
private final Compression.Algorithm compactionCompression;
private HFileDataBlockEncoder dataBlockEncoder;
/** Checksum configuration */
private ChecksumType checksumType;
private int bytesPerChecksum;
// Comparing KeyValues
final KeyValue.KVComparator comparator;
@ -246,6 +251,35 @@ public class Store extends SchemaConfigured implements HeapSize {
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
}
this.storefiles = sortAndClone(loadStoreFiles());
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
this.checksumType = getChecksumType(conf);
// initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf);
}
/**
* Returns the configured bytesPerChecksum value.
* @param conf The configuration
* @return The bytesPerChecksum that is set in the configuration
*/
public static int getBytesPerChecksum(Configuration conf) {
return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
HFile.DEFAULT_BYTES_PER_CHECKSUM);
}
/**
* Returns the configured checksum algorithm.
* @param conf The configuration
* @return The checksum algorithm that is set in the configuration
*/
public static ChecksumType getChecksumType(Configuration conf) {
String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
if (checksumName == null) {
return HFile.DEFAULT_CHECKSUM_TYPE;
} else {
return ChecksumType.nameToType(checksumName);
}
}
public HColumnDescriptor getFamily() {
@ -799,6 +833,8 @@ public class Store extends SchemaConfigured implements HeapSize {
.withComparator(comparator)
.withBloomType(family.getBloomFilterType())
.withMaxKeyCount(maxKeyCount)
.withChecksumType(checksumType)
.withBytesPerChecksum(bytesPerChecksum)
.build();
// The store file writer's path does not include the CF name, so we need
// to configure the HFile writer directly.
@ -2192,8 +2228,8 @@ public class Store extends SchemaConfigured implements HeapSize {
public static final long FIXED_OVERHEAD =
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
+ (19 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+ (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
@ -699,6 +700,8 @@ public class StoreFile extends SchemaConfigured {
private long maxKeyCount = 0;
private Path dir;
private Path filePath;
private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM;
public WriterBuilder(Configuration conf, CacheConfig cacheConf,
FileSystem fs, int blockSize) {
@ -765,6 +768,24 @@ public class StoreFile extends SchemaConfigured {
return this;
}
/**
* @param checksumType the type of checksum
* @return this (for chained invocation)
*/
public WriterBuilder withChecksumType(ChecksumType checksumType) {
this.checksumType = checksumType;
return this;
}
/**
* @param bytesPerChecksum the number of bytes per checksum chunk
* @return this (for chained invocation)
*/
public WriterBuilder withBytesPerChecksum(int bytesPerChecksum) {
this.bytesPerChecksum = bytesPerChecksum;
return this;
}
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
@ -798,7 +819,8 @@ public class StoreFile extends SchemaConfigured {
comparator = KeyValue.COMPARATOR;
}
return new Writer(fs, filePath, blockSize, compressAlgo, dataBlockEncoder,
conf, cacheConf, comparator, bloomType, maxKeyCount);
conf, cacheConf, comparator, bloomType, maxKeyCount, checksumType,
bytesPerChecksum);
}
}
@ -896,6 +918,12 @@ public class StoreFile extends SchemaConfigured {
protected HFileDataBlockEncoder dataBlockEncoder;
/** Checksum type */
protected ChecksumType checksumType;
/** Bytes per Checksum */
protected int bytesPerChecksum;
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
* When flushing a memstore, we set TimeRange and use this variable to
@ -918,13 +946,16 @@ public class StoreFile extends SchemaConfigured {
* @param bloomType bloom filter setting
* @param maxKeys the expected maximum number of keys to be added. Was used
* for Bloom filter size in {@link HFile} format version 1.
* @param checksumType the checksum type
* @param bytesPerChecksum the number of bytes per checksum value
* @throws IOException problem writing to FS
*/
private Writer(FileSystem fs, Path path, int blocksize,
Compression.Algorithm compress,
HFileDataBlockEncoder dataBlockEncoder, final Configuration conf,
CacheConfig cacheConf,
final KVComparator comparator, BloomType bloomType, long maxKeys)
final KVComparator comparator, BloomType bloomType, long maxKeys,
final ChecksumType checksumType, final int bytesPerChecksum)
throws IOException {
this.dataBlockEncoder = dataBlockEncoder != null ?
dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
@ -934,6 +965,8 @@ public class StoreFile extends SchemaConfigured {
.withCompression(compress)
.withDataBlockEncoder(dataBlockEncoder)
.withComparator(comparator.getRawComparator())
.withChecksumType(checksumType)
.withBytesPerChecksum(bytesPerChecksum)
.create();
this.kvComparator = comparator;
@ -964,6 +997,8 @@ public class StoreFile extends SchemaConfigured {
LOG.info("Delete Family Bloom filter type for " + path + ": "
+ deleteFamilyBloomFilterWriter.getClass().getSimpleName());
}
this.checksumType = checksumType;
this.bytesPerChecksum = bytesPerChecksum;
}
/**
@ -1660,7 +1695,7 @@ public class StoreFile extends SchemaConfigured {
}
public int getHFileVersion() {
return reader.getTrailer().getVersion();
return reader.getTrailer().getMajorVersion();
}
HFile.Reader getHFileReader() {

View File

@ -248,6 +248,12 @@ public class RegionServerMetrics implements Updater {
public final MetricsTimeVaryingLong regionSplitFailureCount =
new MetricsTimeVaryingLong("regionSplitFailureCount", registry);
/**
* Number of times checksum verification failed.
*/
public final MetricsLongValue checksumFailuresCount =
new MetricsLongValue("checksumFailuresCount", registry);
public RegionServerMetrics() {
MetricsContext context = MetricsUtil.getContext("hbase");
metricsRecord = MetricsUtil.createRecord(context, "regionserver");
@ -346,6 +352,8 @@ public class RegionServerMetrics implements Updater {
// HFile metrics, positional reads
ops = HFile.getPreadOps();
if (ops != 0) this.fsPreadLatency.inc(ops, HFile.getPreadTimeMs());
this.checksumFailuresCount.set(HFile.getChecksumFailuresCount());
/* NOTE: removed HFile write latency. 2 reasons:
* 1) Mixing HLog latencies are far higher priority since they're
* on-demand and HFile is used in background (compact/flush)
@ -366,6 +374,7 @@ public class RegionServerMetrics implements Updater {
this.slowHLogAppendCount.pushMetric(this.metricsRecord);
this.regionSplitSuccessCount.pushMetric(this.metricsRecord);
this.regionSplitFailureCount.pushMetric(this.metricsRecord);
this.checksumFailuresCount.pushMetric(this.metricsRecord);
}
this.metricsRecord.update();
}

View File

@ -0,0 +1,99 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.lang.ClassNotFoundException;
import java.util.zip.Checksum;
import java.lang.reflect.Constructor;
/**
* Utility class that is used to generate a Checksum object.
* The Checksum implementation is pluggable and an application
* can specify their own class that implements their own
* Checksum algorithm.
*/
public class ChecksumFactory {
static private final Class<?>[] EMPTY_ARRAY = new Class[]{};
/**
* Create a new instance of a Checksum object.
* @return The newly created Checksum object
*/
static public Checksum newInstance(String className) throws IOException {
try {
Class<?> clazz = getClassByName(className);
return (Checksum)newInstance(clazz);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
/**
* Returns a Constructor that can be used to create a Checksum object.
* @return The Constructor that can be used to create a
* new Checksum object.
* @param theClass classname for which an constructor is created
* @return a new Constructor object
*/
static public Constructor<?> newConstructor(String className)
throws IOException {
try {
Class<?> clazz = getClassByName(className);
Constructor<?> ctor = clazz.getDeclaredConstructor(EMPTY_ARRAY);
ctor.setAccessible(true);
return ctor;
} catch (ClassNotFoundException e) {
throw new IOException(e);
} catch (java.lang.NoSuchMethodException e) {
throw new IOException(e);
}
}
/** Create an object for the given class and initialize it from conf
*
* @param theClass class of which an object is created
* @return a new object
*/
static private <T> T newInstance(Class<T> theClass) {
T result;
try {
Constructor<T> ctor = theClass.getDeclaredConstructor(EMPTY_ARRAY);
ctor.setAccessible(true);
result = ctor.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
return result;
}
/**
* Load a class by name.
* @param name the class name.
* @return the class object.
* @throws ClassNotFoundException if the class is not found.
*/
static private Class<?> getClassByName(String name)
throws ClassNotFoundException {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
return Class.forName(name, true, classLoader);
}
}

View File

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ChecksumFactory;
/**
* Checksum types. The Checksum type is a one byte number
* that stores a representation of the checksum algorithm
* used to encode a hfile. The ordinal of these cannot
* change or else you risk breaking all existing HFiles out there.
*/
public enum ChecksumType {
NULL((byte)0) {
@Override
public String getName() {
return "NULL";
}
@Override
public void initialize() {
// do nothing
}
@Override
public Checksum getChecksumObject() throws IOException {
return null; // checksums not used
}
},
CRC32((byte)1) {
private volatile Constructor<?> ctor;
@Override
public String getName() {
return "CRC32";
}
@Override
public void initialize() {
final String PURECRC32 = "org.apache.hadoop.util.PureJavaCrc32";
final String JDKCRC = "java.util.zip.CRC32";
LOG = LogFactory.getLog(ChecksumType.class);
// check if hadoop library is available
try {
ctor = ChecksumFactory.newConstructor(PURECRC32);
LOG.info("Checksum using " + PURECRC32);
} catch (Exception e) {
LOG.info(PURECRC32 + " not available.");
}
try {
// The default checksum class name is java.util.zip.CRC32.
// This is available on all JVMs.
if (ctor == null) {
ctor = ChecksumFactory.newConstructor(JDKCRC);
LOG.info("Checksum can use " + JDKCRC);
}
} catch (Exception e) {
LOG.warn(JDKCRC + " not available. ", e);
}
}
@Override
public Checksum getChecksumObject() throws IOException {
if (ctor == null) {
throw new IOException("Bad constructor for " + getName());
}
try {
return (Checksum)ctor.newInstance();
} catch (Exception e) {
throw new IOException(e);
}
}
},
CRC32C((byte)2) {
private transient Constructor<?> ctor;
@Override
public String getName() {
return "CRC32C";
}
@Override
public void initialize() {
final String PURECRC32C = "org.apache.hadoop.util.PureJavaCrc32C";
LOG = LogFactory.getLog(ChecksumType.class);
try {
ctor = ChecksumFactory.newConstructor(PURECRC32C);
LOG.info("Checksum can use " + PURECRC32C);
} catch (Exception e) {
LOG.info(PURECRC32C + " not available. ");
}
}
@Override
public Checksum getChecksumObject() throws IOException {
if (ctor == null) {
throw new IOException("Bad constructor for " + getName());
}
try {
return (Checksum)ctor.newInstance();
} catch (Exception e) {
throw new IOException(e);
}
}
};
private final byte code;
protected Log LOG;
/** initializes the relevant checksum class object */
abstract void initialize();
/** returns the name of this checksum type */
public abstract String getName();
private ChecksumType(final byte c) {
this.code = c;
initialize();
}
/** returns a object that can be used to generate/validate checksums */
public abstract Checksum getChecksumObject() throws IOException;
public byte getCode() {
return this.code;
}
/**
* Cannot rely on enum ordinals . They change if item is removed or moved.
* Do our own codes.
* @param b
* @return Type associated with passed code.
*/
public static ChecksumType codeToType(final byte b) {
for (ChecksumType t : ChecksumType.values()) {
if (t.getCode() == b) {
return t;
}
}
throw new RuntimeException("Unknown checksum type code " + b);
}
/**
* Map a checksum name to a specific type.
* Do our own names.
* @param b
* @return Type associated with passed code.
*/
public static ChecksumType nameToType(final String name) {
for (ChecksumType t : ChecksumType.values()) {
if (t.getName().equals(name)) {
return t;
}
}
throw new RuntimeException("Unknown checksum type name " + name);
}
}

View File

@ -109,7 +109,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase
ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly();
result = ByteBloomFilter.contains(key, keyOffset, keyLength,
bloomBuf.array(), bloomBuf.arrayOffset() + HFileBlock.HEADER_SIZE,
bloomBuf.array(), bloomBuf.arrayOffset() + bloomBlock.headerSize(),
bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
}

View File

@ -58,7 +58,9 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.master.HMaster;
@ -189,6 +191,9 @@ public class HBaseTestingUtility {
public HBaseTestingUtility(Configuration conf) {
this.conf = conf;
// a hbase checksum verification failure will cause unit tests to fail
ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
}
/**
@ -1437,7 +1442,7 @@ public class HBaseTestingUtility {
}
public FileSystem getTestFileSystem() throws IOException {
return FileSystem.get(conf);
return HFileSystem.get(conf);
}
/**

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
public class CacheTestUtils {
@ -323,7 +324,9 @@ public class CacheTestUtils {
HFileBlock generated = new HFileBlock(BlockType.DATA,
onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
blockSize, includesMemstoreTS);
blockSize, includesMemstoreTS, HFileBlock.MINOR_VERSION_NO_CHECKSUM,
0, ChecksumType.NULL.getCode(),
onDiskSizeWithoutHeader + HFileBlock.HEADER_SIZE);
String strKey;
/* No conflicting keys */

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFile;
@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.Before;
@ -88,6 +90,8 @@ public class TestCacheOnWrite {
private static final int INDEX_BLOCK_SIZE = 512;
private static final int BLOOM_BLOCK_SIZE = 4096;
private static final BloomType BLOOM_TYPE = StoreFile.BloomType.ROWCOL;
private static final ChecksumType CKTYPE = ChecksumType.CRC32;
private static final int CKBYTES = 512;
/** The number of valid key types possible in a store file */
private static final int NUM_VALID_KEY_TYPES =
@ -192,7 +196,7 @@ public class TestCacheOnWrite {
conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
cowType.shouldBeCached(BlockType.BLOOM_CHUNK));
cowType.modifyConf(conf);
fs = FileSystem.get(conf);
fs = HFileSystem.get(conf);
cacheConf = new CacheConfig(conf);
blockCache = cacheConf.getBlockCache();
}
@ -292,6 +296,8 @@ public class TestCacheOnWrite {
.withComparator(KeyValue.COMPARATOR)
.withBloomType(BLOOM_TYPE)
.withMaxKeyCount(NUM_KV)
.withChecksumType(CKTYPE)
.withBytesPerChecksum(CKBYTES)
.build();
final int rowLen = 32;

View File

@ -0,0 +1,290 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.*;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.Compressor;
import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestChecksum {
// change this value to activate more logs
private static final boolean detailedLogging = true;
private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
NONE, GZ };
static final int[] BYTES_PER_CHECKSUM = {
50, 500, 688, 16*1024, (16*1024+980), 64 * 1024};
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private FileSystem fs;
private HFileSystem hfs;
@Before
public void setUp() throws Exception {
fs = HFileSystem.get(TEST_UTIL.getConfiguration());
hfs = (HFileSystem)fs;
}
/**
* Introduce checksum failures and check that we can still read
* the data
*/
@Test
public void testChecksumCorruption() throws IOException {
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
for (boolean pread : new boolean[] { false, true }) {
LOG.info("testChecksumCorruption: Compression algorithm: " + algo +
", pread=" + pread);
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ algo);
FSDataOutputStream os = fs.create(path);
HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
true, HFile.DEFAULT_CHECKSUM_TYPE,
HFile.DEFAULT_BYTES_PER_CHECKSUM);
long totalSize = 0;
for (int blockId = 0; blockId < 2; ++blockId) {
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < 1234; ++i)
dos.writeInt(i);
hbw.writeHeaderAndData(os);
totalSize += hbw.getOnDiskSizeWithHeader();
}
os.close();
// Use hbase checksums.
assertEquals(true, hfs.useHBaseChecksum());
assertEquals(true, hfs.getNoChecksumFs() != hfs.getBackingFs());
// Do a read that purposely introduces checksum verification failures.
FSDataInputStream is = fs.open(path);
HFileBlock.FSReader hbr = new FSReaderV2Test(is, algo,
totalSize, HFile.MAX_FORMAT_VERSION, fs, path);
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
// read data back from the hfile, exclude header and checksum
ByteBuffer bb = b.getBufferWithoutHeader(); // read back data
DataInputStream in = new DataInputStream(
new ByteArrayInputStream(
bb.array(), bb.arrayOffset(), bb.limit()));
// assert that we encountered hbase checksum verification failures
// but still used hdfs checksums and read data successfully.
assertEquals(1, HFile.getChecksumFailuresCount());
validateData(in);
// A single instance of hbase checksum failure causes the reader to
// switch off hbase checksum verification for the next 100 read
// requests. Verify that this is correct.
for (int i = 0; i <
HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
b = hbr.readBlockData(0, -1, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
}
// The next read should have hbase checksum verification reanabled,
// we verify this by assertng that there was a hbase-checksum failure.
b = hbr.readBlockData(0, -1, -1, pread);
assertEquals(1, HFile.getChecksumFailuresCount());
// Since the above encountered a checksum failure, we switch
// back to not checking hbase checksums.
b = hbr.readBlockData(0, -1, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
is.close();
// Now, use a completely new reader. Switch off hbase checksums in
// the configuration. In this case, we should not detect
// any retries within hbase.
HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
assertEquals(false, newfs.useHBaseChecksum());
is = newfs.open(path);
hbr = new FSReaderV2Test(is, algo,
totalSize, HFile.MAX_FORMAT_VERSION, newfs, path);
b = hbr.readBlockData(0, -1, -1, pread);
is.close();
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
// read data back from the hfile, exclude header and checksum
bb = b.getBufferWithoutHeader(); // read back data
in = new DataInputStream(new ByteArrayInputStream(
bb.array(), bb.arrayOffset(), bb.limit()));
// assert that we did not encounter hbase checksum verification failures
// but still used hdfs checksums and read data successfully.
assertEquals(0, HFile.getChecksumFailuresCount());
validateData(in);
}
}
}
/**
* Test different values of bytesPerChecksum
*/
@Test
public void testChecksumChunks() throws IOException {
Compression.Algorithm algo = NONE;
for (boolean pread : new boolean[] { false, true }) {
for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" +
algo + bytesPerChecksum);
FSDataOutputStream os = fs.create(path);
HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
true, HFile.DEFAULT_CHECKSUM_TYPE, bytesPerChecksum);
// write one block. The block has data
// that is at least 6 times more than the checksum chunk size
long dataSize = 0;
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (; dataSize < 6 * bytesPerChecksum;) {
for (int i = 0; i < 1234; ++i) {
dos.writeInt(i);
dataSize += 4;
}
}
hbw.writeHeaderAndData(os);
long totalSize = hbw.getOnDiskSizeWithHeader();
os.close();
long expectedChunks = ChecksumUtil.numChunks(
dataSize + HFileBlock.HEADER_SIZE,
bytesPerChecksum);
LOG.info("testChecksumChunks: pread=" + pread +
", bytesPerChecksum=" + bytesPerChecksum +
", fileSize=" + totalSize +
", dataSize=" + dataSize +
", expectedChunks=" + expectedChunks);
// Verify hbase checksums.
assertEquals(true, hfs.useHBaseChecksum());
assertEquals(true, hfs.getNoChecksumFs() != hfs.getBackingFs());
// Read data back from file.
FSDataInputStream is = fs.open(path);
FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, nochecksum,
algo, totalSize, HFile.MAX_FORMAT_VERSION, hfs, path);
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
is.close();
b.sanityCheck();
assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
// verify that we have the expected number of checksum chunks
assertEquals(totalSize, HFileBlock.HEADER_SIZE + dataSize +
expectedChunks * HFileBlock.CHECKSUM_SIZE);
// assert that we did not encounter hbase checksum verification failures
assertEquals(0, HFile.getChecksumFailuresCount());
}
}
}
/**
* Test to ensure that these is at least one valid checksum implementation
*/
@Test
public void testChecksumAlgorithm() throws IOException {
ChecksumType type = ChecksumType.CRC32;
assertEquals(ChecksumType.nameToType(type.getName()), type);
assertEquals(ChecksumType.valueOf(type.toString()), type);
}
private void validateData(DataInputStream in) throws IOException {
// validate data
for (int i = 0; i < 1234; i++) {
int val = in.readInt();
if (val != i) {
String msg = "testChecksumCorruption: data mismatch at index " +
i + " expected " + i + " found " + val;
LOG.warn(msg);
assertEquals(i, val);
}
}
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
/**
* A class that introduces hbase-checksum failures while
* reading data from hfiles. This should trigger the hdfs level
* checksum validations.
*/
static private class FSReaderV2Test extends HFileBlock.FSReaderV2 {
FSReaderV2Test(FSDataInputStream istream, Algorithm algo,
long fileSize, int minorVersion, FileSystem fs,
Path path) throws IOException {
super(istream, istream, algo, fileSize, minorVersion,
(HFileSystem)fs, path);
}
@Override
protected boolean validateBlockChecksum(HFileBlock block,
byte[] data, int hdrSize) throws IOException {
return false; // checksum validation failure
}
}
}

View File

@ -52,7 +52,7 @@ public class TestFixedFileTrailer {
private static final Log LOG = LogFactory.getLog(TestFixedFileTrailer.class);
/** The number of used fields by version. Indexed by version minus one. */
private static final int[] NUM_FIELDS_BY_VERSION = new int[] { 8, 13 };
private static final int[] NUM_FIELDS_BY_VERSION = new int[] { 9, 14 };
private HBaseTestingUtility util = new HBaseTestingUtility();
private FileSystem fs;
@ -83,7 +83,8 @@ public class TestFixedFileTrailer {
@Test
public void testTrailer() throws IOException {
FixedFileTrailer t = new FixedFileTrailer(version);
FixedFileTrailer t = new FixedFileTrailer(version,
HFileBlock.MINOR_VERSION_NO_CHECKSUM);
t.setDataIndexCount(3);
t.setEntryCount(((long) Integer.MAX_VALUE) + 1);
@ -121,7 +122,8 @@ public class TestFixedFileTrailer {
// Finished writing, trying to read.
{
DataInputStream dis = new DataInputStream(bais);
FixedFileTrailer t2 = new FixedFileTrailer(version);
FixedFileTrailer t2 = new FixedFileTrailer(version,
HFileBlock.MINOR_VERSION_NO_CHECKSUM);
t2.deserialize(dis);
assertEquals(-1, bais.read()); // Ensure we have read everything.
checkLoadedTrailer(version, t, t2);
@ -191,7 +193,7 @@ public class TestFixedFileTrailer {
private void checkLoadedTrailer(int version, FixedFileTrailer expected,
FixedFileTrailer loaded) throws IOException {
assertEquals(version, loaded.getVersion());
assertEquals(version, loaded.getMajorVersion());
assertEquals(expected.getDataIndexCount(), loaded.getDataIndexCount());
assertEquals(Math.min(expected.getEntryCount(),

View File

@ -48,9 +48,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.DoubleOutputStream;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionOutputStream;
@ -102,16 +104,16 @@ public class TestHFileBlock {
@Before
public void setUp() throws IOException {
fs = FileSystem.get(TEST_UTIL.getConfiguration());
fs = HFileSystem.get(TEST_UTIL.getConfiguration());
}
public void writeTestBlockContents(DataOutputStream dos) throws IOException {
static void writeTestBlockContents(DataOutputStream dos) throws IOException {
// This compresses really well.
for (int i = 0; i < 1000; ++i)
dos.writeInt(i / 100);
}
private int writeTestKeyValues(OutputStream dos, int seed)
static int writeTestKeyValues(OutputStream dos, int seed, boolean includesMemstoreTS)
throws IOException {
List<KeyValue> keyValues = new ArrayList<KeyValue>();
Random randomizer = new Random(42l + seed); // just any fixed number
@ -191,22 +193,24 @@ public class TestHFileBlock {
return baos.toByteArray();
}
private byte[] createTestV2Block(Compression.Algorithm algo)
throws IOException {
static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo,
boolean includesMemstoreTS) throws IOException {
final BlockType blockType = BlockType.DATA;
HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
includesMemstoreTS);
includesMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE,
HFile.DEFAULT_BYTES_PER_CHECKSUM);
DataOutputStream dos = hbw.startWriting(blockType);
writeTestBlockContents(dos);
byte[] headerAndData = hbw.getHeaderAndData();
byte[] headerAndData = hbw.getHeaderAndDataForTest();
assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
hbw.releaseCompressor();
return headerAndData;
return hbw;
}
public String createTestBlockStr(Compression.Algorithm algo,
int correctLength) throws IOException {
byte[] testV2Block = createTestV2Block(algo);
HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS);
byte[] testV2Block = hbw.getHeaderAndDataForTest();
int osOffset = HFileBlock.HEADER_SIZE + 9;
if (testV2Block.length == correctLength) {
// Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
@ -221,14 +225,16 @@ public class TestHFileBlock {
@Test
public void testNoCompression() throws IOException {
assertEquals(4000 + HFileBlock.HEADER_SIZE, createTestV2Block(NONE).length);
assertEquals(4000, createTestV2Block(NONE, includesMemstoreTS).
getBlockForCaching().getUncompressedSizeWithoutHeader());
}
@Test
public void testGzipCompression() throws IOException {
final String correctTestBlockStr =
"DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
"DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
+ "\\xFF\\xFF\\xFF\\xFF"
+ "\\x01\\x00\\x00@\\x00\\x00\\x00\\x00["
// gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
+ "\\x1F\\x8B" // gzip magic signature
+ "\\x08" // Compression method: 8 = "deflate"
@ -240,8 +246,9 @@ public class TestHFileBlock {
+ "\\x03"
+ "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
+ "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
+ "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00";
final int correctGzipBlockLength = 82;
+ "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00"
+ "\\xAB\\x85g\\x91"; // 4 byte checksum
final int correctGzipBlockLength = 95;
assertEquals(correctTestBlockStr, createTestBlockStr(GZ,
correctGzipBlockLength));
}
@ -285,11 +292,14 @@ public class TestHFileBlock {
public void testReaderV2() throws IOException {
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
for (boolean pread : new boolean[] { false, true }) {
LOG.info("testReaderV2: Compression algorithm: " + algo +
", pread=" + pread);
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ algo);
FSDataOutputStream os = fs.create(path);
HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null,
includesMemstoreTS);
includesMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE,
HFile.DEFAULT_BYTES_PER_CHECKSUM);
long totalSize = 0;
for (int blockId = 0; blockId < 2; ++blockId) {
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
@ -305,16 +315,19 @@ public class TestHFileBlock {
totalSize);
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
is.close();
assertEquals(0, HFile.getChecksumFailuresCount());
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
String blockStr = b.toString();
if (algo == GZ) {
is = fs.open(path);
hbr = new HFileBlock.FSReaderV2(is, algo, totalSize);
b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE, -1, pread);
b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE +
b.totalChecksumBytes(), -1, pread);
assertEquals(blockStr, b.toString());
int wrongCompressedSize = 2172;
try {
@ -351,13 +364,15 @@ public class TestHFileBlock {
HFileDataBlockEncoder dataBlockEncoder =
new HFileDataBlockEncoderImpl(encoding);
HFileBlock.Writer hbw = new HFileBlock.Writer(algo, dataBlockEncoder,
includesMemstoreTS);
includesMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE,
HFile.DEFAULT_BYTES_PER_CHECKSUM);
long totalSize = 0;
final List<Integer> encodedSizes = new ArrayList<Integer>();
final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
for (int blockId = 0; blockId < numBlocks; ++blockId) {
writeEncodedBlock(encoding, hbw, encodedSizes, encodedBlocks,
blockId);
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
writeEncodedBlock(encoding, dos, encodedSizes, encodedBlocks,
blockId, includesMemstoreTS);
hbw.writeHeaderAndData(os);
totalSize += hbw.getOnDiskSizeWithHeader();
@ -374,6 +389,7 @@ public class TestHFileBlock {
int pos = 0;
for (int blockId = 0; blockId < numBlocks; ++blockId) {
b = hbr.readBlockData(pos, -1, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
b.sanityCheck();
pos += b.getOnDiskSizeWithHeader();
@ -401,16 +417,16 @@ public class TestHFileBlock {
}
}
private void writeEncodedBlock(DataBlockEncoding encoding,
HFileBlock.Writer hbw, final List<Integer> encodedSizes,
final List<ByteBuffer> encodedBlocks, int blockId) throws IOException {
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
static void writeEncodedBlock(DataBlockEncoding encoding,
DataOutputStream dos, final List<Integer> encodedSizes,
final List<ByteBuffer> encodedBlocks, int blockId,
boolean includesMemstoreTS) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DoubleOutputStream doubleOutputStream =
new DoubleOutputStream(dos, baos);
final int rawBlockSize = writeTestKeyValues(doubleOutputStream,
blockId);
blockId, includesMemstoreTS);
ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray());
rawBuf.rewind();
@ -434,7 +450,7 @@ public class TestHFileBlock {
encodedBlocks.add(encodedBuf);
}
private void assertBuffersEqual(ByteBuffer expectedBuffer,
static void assertBuffersEqual(ByteBuffer expectedBuffer,
ByteBuffer actualBuffer, Compression.Algorithm compression,
DataBlockEncoding encoding, boolean pread) {
if (!actualBuffer.equals(expectedBuffer)) {
@ -471,7 +487,9 @@ public class TestHFileBlock {
for (boolean pread : BOOLEAN_VALUES) {
for (boolean cacheOnWrite : BOOLEAN_VALUES) {
Random rand = defaultRandom();
LOG.info("Compression algorithm: " + algo + ", pread=" + pread);
LOG.info("testPreviousOffset:Compression algorithm: " + algo +
", pread=" + pread +
", cacheOnWrite=" + cacheOnWrite);
Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
List<Long> expectedOffsets = new ArrayList<Long>();
List<Long> expectedPrevOffsets = new ArrayList<Long>();
@ -522,17 +540,23 @@ public class TestHFileBlock {
b2.getUncompressedSizeWithoutHeader());
assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
assertEquals(curOffset, b2.getOffset());
assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
assertEquals(b.getOnDiskDataSizeWithHeader(),
b2.getOnDiskDataSizeWithHeader());
assertEquals(0, HFile.getChecksumFailuresCount());
curOffset += b.getOnDiskSizeWithHeader();
if (cacheOnWrite) {
// In the cache-on-write mode we store uncompressed bytes so we
// can compare them to what was read by the block reader.
// b's buffer has header + data + checksum while
// expectedContents have header + data only
ByteBuffer bufRead = b.getBufferWithHeader();
ByteBuffer bufExpected = expectedContents.get(i);
boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
bufRead.arrayOffset(), bufRead.limit(),
bufRead.arrayOffset(),
bufRead.limit() - b.totalChecksumBytes(),
bufExpected.array(), bufExpected.arrayOffset(),
bufExpected.limit()) == 0;
String wrongBytesMsg = "";
@ -541,15 +565,26 @@ public class TestHFileBlock {
// Optimization: only construct an error message in case we
// will need it.
wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
+ algo + ", pread=" + pread + "):\n";
+ algo + ", pread=" + pread
+ ", cacheOnWrite=" + cacheOnWrite + "):\n";
wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
bufExpected.arrayOffset(), Math.min(32,
bufExpected.limit()))
+ ", actual:\n"
+ Bytes.toStringBinary(bufRead.array(),
bufRead.arrayOffset(), Math.min(32, bufRead.limit()));
if (detailedLogging) {
LOG.warn("expected header" +
HFileBlock.toStringHeader(bufExpected) +
"\nfound header" +
HFileBlock.toStringHeader(bufRead));
LOG.warn("bufread offset " + bufRead.arrayOffset() +
" limit " + bufRead.limit() +
" expected offset " + bufExpected.arrayOffset() +
" limit " + bufExpected.limit());
LOG.warn(wrongBytesMsg);
}
}
assertTrue(wrongBytesMsg, bytesAreCorrect);
}
}
@ -672,10 +707,12 @@ public class TestHFileBlock {
boolean cacheOnWrite = expectedContents != null;
FSDataOutputStream os = fs.create(path);
HFileBlock.Writer hbw = new HFileBlock.Writer(compressAlgo, null,
includesMemstoreTS);
includesMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE,
HFile.DEFAULT_BYTES_PER_CHECKSUM);
Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
long totalSize = 0;
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
long pos = os.getPos();
int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
blockTypeOrdinal = BlockType.DATA.ordinal();
@ -706,9 +743,9 @@ public class TestHFileBlock {
expectedContents.add(hbw.getUncompressedBufferWithHeader());
if (detailedLogging) {
LOG.info("Writing block #" + i + " of type " + bt
LOG.info("Written block #" + i + " of type " + bt
+ ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
+ " at offset " + os.getPos());
+ " at offset " + pos);
}
}
os.close();
@ -730,7 +767,9 @@ public class TestHFileBlock {
byte[] byteArr = new byte[HFileBlock.HEADER_SIZE + size];
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, includesMemstoreTS);
HFileBlock.FILL_HEADER, -1, includesMemstoreTS,
HFileBlock.MINOR_VERSION_NO_CHECKSUM, 0, ChecksumType.NULL.getCode(),
0);
long byteBufferExpectedSize =
ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
+ HFileBlock.HEADER_SIZE + size);

View File

@ -0,0 +1,806 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.*;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.DoubleOutputStream;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* This class has unit tests to prove that older versions of
* HFiles (without checksums) are compatible with current readers.
*/
@Category(MediumTests.class)
@RunWith(Parameterized.class)
public class TestHFileBlockCompatibility {
// change this value to activate more logs
private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
NONE, GZ };
// The mnior version for pre-checksum files
private static int MINOR_VERSION = 0;
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private HFileSystem fs;
private int uncompressedSizeV1;
private final boolean includesMemstoreTS;
public TestHFileBlockCompatibility(boolean includesMemstoreTS) {
this.includesMemstoreTS = includesMemstoreTS;
}
@Parameters
public static Collection<Object[]> parameters() {
return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
}
@Before
public void setUp() throws IOException {
fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration());
}
public byte[] createTestV1Block(Compression.Algorithm algo)
throws IOException {
Compressor compressor = algo.getCompressor();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStream os = algo.createCompressionStream(baos, compressor, 0);
DataOutputStream dos = new DataOutputStream(os);
BlockType.META.write(dos); // Let's make this a meta block.
TestHFileBlock.writeTestBlockContents(dos);
uncompressedSizeV1 = dos.size();
dos.flush();
algo.returnCompressor(compressor);
return baos.toByteArray();
}
private Writer createTestV2Block(Compression.Algorithm algo)
throws IOException {
final BlockType blockType = BlockType.DATA;
Writer hbw = new Writer(algo, null,
includesMemstoreTS);
DataOutputStream dos = hbw.startWriting(blockType);
TestHFileBlock.writeTestBlockContents(dos);
byte[] headerAndData = hbw.getHeaderAndData();
assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
hbw.releaseCompressor();
return hbw;
}
private String createTestBlockStr(Compression.Algorithm algo,
int correctLength) throws IOException {
Writer hbw = createTestV2Block(algo);
byte[] testV2Block = hbw.getHeaderAndData();
int osOffset = HFileBlock.HEADER_SIZE_NO_CHECKSUM + 9;
if (testV2Block.length == correctLength) {
// Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
// variations across operating systems.
// See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
testV2Block[osOffset] = 3;
}
return Bytes.toStringBinary(testV2Block);
}
@Test
public void testNoCompression() throws IOException {
assertEquals(4000, createTestV2Block(NONE).getBlockForCaching().
getUncompressedSizeWithoutHeader());
}
@Test
public void testGzipCompression() throws IOException {
final String correctTestBlockStr =
"DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
+ "\\xFF\\xFF\\xFF\\xFF"
// gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
+ "\\x1F\\x8B" // gzip magic signature
+ "\\x08" // Compression method: 8 = "deflate"
+ "\\x00" // Flags
+ "\\x00\\x00\\x00\\x00" // mtime
+ "\\x00" // XFL (extra flags)
// OS (0 = FAT filesystems, 3 = Unix). However, this field
// sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
+ "\\x03"
+ "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
+ "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
+ "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00";
final int correctGzipBlockLength = 82;
assertEquals(correctTestBlockStr, createTestBlockStr(GZ,
correctGzipBlockLength));
}
@Test
public void testReaderV1() throws IOException {
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
for (boolean pread : new boolean[] { false, true }) {
byte[] block = createTestV1Block(algo);
Path path = new Path(TEST_UTIL.getDataTestDir(),
"blocks_v1_"+ algo);
LOG.info("Creating temporary file at " + path);
FSDataOutputStream os = fs.create(path);
int totalSize = 0;
int numBlocks = 50;
for (int i = 0; i < numBlocks; ++i) {
os.write(block);
totalSize += block.length;
}
os.close();
FSDataInputStream is = fs.open(path);
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV1(is, algo,
totalSize);
HFileBlock b;
int numBlocksRead = 0;
long pos = 0;
while (pos < totalSize) {
b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread);
b.sanityCheck();
pos += block.length;
numBlocksRead++;
}
assertEquals(numBlocks, numBlocksRead);
is.close();
}
}
}
@Test
public void testReaderV2() throws IOException {
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
for (boolean pread : new boolean[] { false, true }) {
LOG.info("testReaderV2: Compression algorithm: " + algo +
", pread=" + pread);
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ algo);
FSDataOutputStream os = fs.create(path);
Writer hbw = new Writer(algo, null,
includesMemstoreTS);
long totalSize = 0;
for (int blockId = 0; blockId < 2; ++blockId) {
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < 1234; ++i)
dos.writeInt(i);
hbw.writeHeaderAndData(os);
totalSize += hbw.getOnDiskSizeWithHeader();
}
os.close();
FSDataInputStream is = fs.open(path);
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, is, algo,
totalSize, MINOR_VERSION, fs, path);
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
is.close();
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
String blockStr = b.toString();
if (algo == GZ) {
is = fs.open(path);
hbr = new HFileBlock.FSReaderV2(is, is, algo, totalSize, MINOR_VERSION,
fs, path);
b = hbr.readBlockData(0, 2173 + HFileBlock.HEADER_SIZE_NO_CHECKSUM +
b.totalChecksumBytes(), -1, pread);
assertEquals(blockStr, b.toString());
int wrongCompressedSize = 2172;
try {
b = hbr.readBlockData(0, wrongCompressedSize
+ HFileBlock.HEADER_SIZE_NO_CHECKSUM, -1, pread);
fail("Exception expected");
} catch (IOException ex) {
String expectedPrefix = "On-disk size without header provided is "
+ wrongCompressedSize + ", but block header contains "
+ b.getOnDiskSizeWithoutHeader() + ".";
assertTrue("Invalid exception message: '" + ex.getMessage()
+ "'.\nMessage is expected to start with: '" + expectedPrefix
+ "'", ex.getMessage().startsWith(expectedPrefix));
}
is.close();
}
}
}
}
/**
* Test encoding/decoding data blocks.
* @throws IOException a bug or a problem with temporary files.
*/
@Test
public void testDataBlockEncoding() throws IOException {
final int numBlocks = 5;
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
for (boolean pread : new boolean[] { false, true }) {
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
LOG.info("testDataBlockEncoding algo " + algo +
" pread = " + pread +
" encoding " + encoding);
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ algo + "_" + encoding.toString());
FSDataOutputStream os = fs.create(path);
HFileDataBlockEncoder dataBlockEncoder =
new HFileDataBlockEncoderImpl(encoding);
Writer hbw = new Writer(algo, dataBlockEncoder,
includesMemstoreTS);
long totalSize = 0;
final List<Integer> encodedSizes = new ArrayList<Integer>();
final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
for (int blockId = 0; blockId < numBlocks; ++blockId) {
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
TestHFileBlock.writeEncodedBlock(encoding, dos, encodedSizes, encodedBlocks,
blockId, includesMemstoreTS);
hbw.writeHeaderAndData(os);
totalSize += hbw.getOnDiskSizeWithHeader();
}
os.close();
FSDataInputStream is = fs.open(path);
HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, is, algo,
totalSize, MINOR_VERSION, fs, path);
hbr.setDataBlockEncoder(dataBlockEncoder);
hbr.setIncludesMemstoreTS(includesMemstoreTS);
HFileBlock b;
int pos = 0;
for (int blockId = 0; blockId < numBlocks; ++blockId) {
b = hbr.readBlockData(pos, -1, -1, pread);
b.sanityCheck();
pos += b.getOnDiskSizeWithHeader();
assertEquals((int) encodedSizes.get(blockId),
b.getUncompressedSizeWithoutHeader());
ByteBuffer actualBuffer = b.getBufferWithoutHeader();
if (encoding != DataBlockEncoding.NONE) {
// We expect a two-byte big-endian encoding id.
assertEquals(0, actualBuffer.get(0));
assertEquals(encoding.getId(), actualBuffer.get(1));
actualBuffer.position(2);
actualBuffer = actualBuffer.slice();
}
ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
expectedBuffer.rewind();
// test if content matches, produce nice message
TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding,
pread);
}
is.close();
}
}
}
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
/**
* This is the version of the HFileBlock.Writer that is used to
* create V2 blocks with minor version 0. These blocks do not
* have hbase-level checksums. The code is here to test
* backward compatibility. The reason we do not inherit from
* HFileBlock.Writer is because we never ever want to change the code
* in this class but the code in HFileBlock.Writer will continually
* evolve.
*/
public static final class Writer {
// These constants are as they were in minorVersion 0.
private static final int HEADER_SIZE = HFileBlock.HEADER_SIZE_NO_CHECKSUM;
private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER;
private static final byte[] DUMMY_HEADER =
HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
private enum State {
INIT,
WRITING,
BLOCK_READY
};
/** Writer state. Used to ensure the correct usage protocol. */
private State state = State.INIT;
/** Compression algorithm for all blocks this instance writes. */
private final Compression.Algorithm compressAlgo;
/** Data block encoder used for data blocks */
private final HFileDataBlockEncoder dataBlockEncoder;
/**
* The stream we use to accumulate data in uncompressed format for each
* block. We reset this stream at the end of each block and reuse it. The
* header is written as the first {@link #HEADER_SIZE} bytes into this
* stream.
*/
private ByteArrayOutputStream baosInMemory;
/** Compressor, which is also reused between consecutive blocks. */
private Compressor compressor;
/** Compression output stream */
private CompressionOutputStream compressionStream;
/** Underlying stream to write compressed bytes to */
private ByteArrayOutputStream compressedByteStream;
/**
* Current block type. Set in {@link #startWriting(BlockType)}. Could be
* changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
* to {@link BlockType#ENCODED_DATA}.
*/
private BlockType blockType;
/**
* A stream that we write uncompressed bytes to, which compresses them and
* writes them to {@link #baosInMemory}.
*/
private DataOutputStream userDataStream;
/**
* Bytes to be written to the file system, including the header. Compressed
* if compression is turned on.
*/
private byte[] onDiskBytesWithHeader;
/**
* Valid in the READY state. Contains the header and the uncompressed (but
* potentially encoded, if this is a data block) bytes, so the length is
* {@link #uncompressedSizeWithoutHeader} + {@link HFileBlock#HEADER_SIZE}.
*/
private byte[] uncompressedBytesWithHeader;
/**
* Current block's start offset in the {@link HFile}. Set in
* {@link #writeHeaderAndData(FSDataOutputStream)}.
*/
private long startOffset;
/**
* Offset of previous block by block type. Updated when the next block is
* started.
*/
private long[] prevOffsetByType;
/** The offset of the previous block of the same type */
private long prevOffset;
/** Whether we are including memstore timestamp after every key/value */
private boolean includesMemstoreTS;
/**
* @param compressionAlgorithm compression algorithm to use
* @param dataBlockEncoderAlgo data block encoding algorithm to use
*/
public Writer(Compression.Algorithm compressionAlgorithm,
HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS) {
compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
this.dataBlockEncoder = dataBlockEncoder != null
? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
baosInMemory = new ByteArrayOutputStream();
if (compressAlgo != NONE) {
compressor = compressionAlgorithm.getCompressor();
compressedByteStream = new ByteArrayOutputStream();
try {
compressionStream =
compressionAlgorithm.createPlainCompressionStream(
compressedByteStream, compressor);
} catch (IOException e) {
throw new RuntimeException("Could not create compression stream " +
"for algorithm " + compressionAlgorithm, e);
}
}
prevOffsetByType = new long[BlockType.values().length];
for (int i = 0; i < prevOffsetByType.length; ++i)
prevOffsetByType[i] = -1;
this.includesMemstoreTS = includesMemstoreTS;
}
/**
* Starts writing into the block. The previous block's data is discarded.
*
* @return the stream the user can write their data into
* @throws IOException
*/
public DataOutputStream startWriting(BlockType newBlockType)
throws IOException {
if (state == State.BLOCK_READY && startOffset != -1) {
// We had a previous block that was written to a stream at a specific
// offset. Save that offset as the last offset of a block of that type.
prevOffsetByType[blockType.getId()] = startOffset;
}
startOffset = -1;
blockType = newBlockType;
baosInMemory.reset();
baosInMemory.write(DUMMY_HEADER);
state = State.WRITING;
// We will compress it later in finishBlock()
userDataStream = new DataOutputStream(baosInMemory);
return userDataStream;
}
/**
* Returns the stream for the user to write to. The block writer takes care
* of handling compression and buffering for caching on write. Can only be
* called in the "writing" state.
*
* @return the data output stream for the user to write to
*/
DataOutputStream getUserDataStream() {
expectState(State.WRITING);
return userDataStream;
}
/**
* Transitions the block writer from the "writing" state to the "block
* ready" state. Does nothing if a block is already finished.
*/
private void ensureBlockReady() throws IOException {
Preconditions.checkState(state != State.INIT,
"Unexpected state: " + state);
if (state == State.BLOCK_READY)
return;
// This will set state to BLOCK_READY.
finishBlock();
}
/**
* An internal method that flushes the compressing stream (if using
* compression), serializes the header, and takes care of the separate
* uncompressed stream for caching on write, if applicable. Sets block
* write state to "block ready".
*/
private void finishBlock() throws IOException {
userDataStream.flush();
// This does an array copy, so it is safe to cache this byte array.
uncompressedBytesWithHeader = baosInMemory.toByteArray();
LOG.warn("Writer.finishBlock user data size with header before compression " +
uncompressedBytesWithHeader.length);
prevOffset = prevOffsetByType[blockType.getId()];
// We need to set state before we can package the block up for
// cache-on-write. In a way, the block is ready, but not yet encoded or
// compressed.
state = State.BLOCK_READY;
encodeDataBlockForDisk();
doCompression();
putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length);
}
/**
* Do compression if it is enabled, or re-use the uncompressed buffer if
* it is not. Fills in the compressed block's header if doing compression.
*/
private void doCompression() throws IOException {
// do the compression
if (compressAlgo != NONE) {
compressedByteStream.reset();
compressedByteStream.write(DUMMY_HEADER);
compressionStream.resetState();
compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
uncompressedBytesWithHeader.length - HEADER_SIZE);
compressionStream.flush();
compressionStream.finish();
onDiskBytesWithHeader = compressedByteStream.toByteArray();
putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length);
} else {
onDiskBytesWithHeader = uncompressedBytesWithHeader;
}
}
/**
* Encodes this block if it is a data block and encoding is turned on in
* {@link #dataBlockEncoder}.
*/
private void encodeDataBlockForDisk() throws IOException {
if (blockType != BlockType.DATA) {
return; // skip any non-data block
}
// do data block encoding, if data block encoder is set
ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader,
HEADER_SIZE, uncompressedBytesWithHeader.length -
HEADER_SIZE).slice();
Pair<ByteBuffer, BlockType> encodingResult =
dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
includesMemstoreTS, DUMMY_HEADER);
BlockType encodedBlockType = encodingResult.getSecond();
if (encodedBlockType == BlockType.ENCODED_DATA) {
uncompressedBytesWithHeader = encodingResult.getFirst().array();
blockType = BlockType.ENCODED_DATA;
} else {
// There is no encoding configured. Do some extra sanity-checking.
if (encodedBlockType != BlockType.DATA) {
throw new IOException("Unexpected block type coming out of data " +
"block encoder: " + encodedBlockType);
}
if (userDataStream.size() !=
uncompressedBytesWithHeader.length - HEADER_SIZE) {
throw new IOException("Uncompressed size mismatch: "
+ userDataStream.size() + " vs. "
+ (uncompressedBytesWithHeader.length - HEADER_SIZE));
}
}
}
/**
* Put the header into the given byte array at the given offset.
* @param onDiskSize size of the block on disk
* @param uncompressedSize size of the block after decompression (but
* before optional data block decoding)
*/
private void putHeader(byte[] dest, int offset, int onDiskSize,
int uncompressedSize) {
offset = blockType.put(dest, offset);
offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
Bytes.putLong(dest, offset, prevOffset);
}
/**
* Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
* the offset of this block so that it can be referenced in the next block
* of the same type.
*
* @param out
* @throws IOException
*/
public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
long offset = out.getPos();
if (startOffset != -1 && offset != startOffset) {
throw new IOException("A " + blockType + " block written to a "
+ "stream twice, first at offset " + startOffset + ", then at "
+ offset);
}
startOffset = offset;
writeHeaderAndData((DataOutputStream) out);
}
/**
* Writes the header and the compressed data of this block (or uncompressed
* data when not using compression) into the given stream. Can be called in
* the "writing" state or in the "block ready" state. If called in the
* "writing" state, transitions the writer to the "block ready" state.
*
* @param out the output stream to write the
* @throws IOException
*/
private void writeHeaderAndData(DataOutputStream out) throws IOException {
ensureBlockReady();
out.write(onDiskBytesWithHeader);
}
/**
* Returns the header or the compressed data (or uncompressed data when not
* using compression) as a byte array. Can be called in the "writing" state
* or in the "block ready" state. If called in the "writing" state,
* transitions the writer to the "block ready" state.
*
* @return header and data as they would be stored on disk in a byte array
* @throws IOException
*/
public byte[] getHeaderAndData() throws IOException {
ensureBlockReady();
return onDiskBytesWithHeader;
}
/**
* Releases the compressor this writer uses to compress blocks into the
* compressor pool. Needs to be called before the writer is discarded.
*/
public void releaseCompressor() {
if (compressor != null) {
compressAlgo.returnCompressor(compressor);
compressor = null;
}
}
/**
* Returns the on-disk size of the data portion of the block. This is the
* compressed size if compression is enabled. Can only be called in the
* "block ready" state. Header is not compressed, and its size is not
* included in the return value.
*
* @return the on-disk size of the block, not including the header.
*/
public int getOnDiskSizeWithoutHeader() {
expectState(State.BLOCK_READY);
return onDiskBytesWithHeader.length - HEADER_SIZE;
}
/**
* Returns the on-disk size of the block. Can only be called in the
* "block ready" state.
*
* @return the on-disk size of the block ready to be written, including the
* header size
*/
public int getOnDiskSizeWithHeader() {
expectState(State.BLOCK_READY);
return onDiskBytesWithHeader.length;
}
/**
* The uncompressed size of the block data. Does not include header size.
*/
public int getUncompressedSizeWithoutHeader() {
expectState(State.BLOCK_READY);
return uncompressedBytesWithHeader.length - HEADER_SIZE;
}
/**
* The uncompressed size of the block data, including header size.
*/
public int getUncompressedSizeWithHeader() {
expectState(State.BLOCK_READY);
return uncompressedBytesWithHeader.length;
}
/** @return true if a block is being written */
public boolean isWriting() {
return state == State.WRITING;
}
/**
* Returns the number of bytes written into the current block so far, or
* zero if not writing the block at the moment. Note that this will return
* zero in the "block ready" state as well.
*
* @return the number of bytes written
*/
public int blockSizeWritten() {
if (state != State.WRITING)
return 0;
return userDataStream.size();
}
/**
* Returns the header followed by the uncompressed data, even if using
* compression. This is needed for storing uncompressed blocks in the block
* cache. Can be called in the "writing" state or the "block ready" state.
*
* @return uncompressed block bytes for caching on write
*/
private byte[] getUncompressedDataWithHeader() {
expectState(State.BLOCK_READY);
return uncompressedBytesWithHeader;
}
private void expectState(State expectedState) {
if (state != expectedState) {
throw new IllegalStateException("Expected state: " + expectedState +
", actual state: " + state);
}
}
/**
* Similar to {@link #getUncompressedBufferWithHeader()} but returns a byte
* buffer.
*
* @return uncompressed block for caching on write in the form of a buffer
*/
public ByteBuffer getUncompressedBufferWithHeader() {
byte[] b = getUncompressedDataWithHeader();
return ByteBuffer.wrap(b, 0, b.length);
}
/**
* Takes the given {@link BlockWritable} instance, creates a new block of
* its appropriate type, writes the writable into this block, and flushes
* the block into the output stream. The writer is instructed not to buffer
* uncompressed bytes for cache-on-write.
*
* @param bw the block-writable object to write as a block
* @param out the file system output stream
* @throws IOException
*/
public void writeBlock(BlockWritable bw, FSDataOutputStream out)
throws IOException {
bw.writeToBlock(startWriting(bw.getBlockType()));
writeHeaderAndData(out);
}
/**
* Creates a new HFileBlock.
*/
public HFileBlock getBlockForCaching() {
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
getUncompressedSizeWithoutHeader(), prevOffset,
getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
includesMemstoreTS, MINOR_VERSION, 0, ChecksumType.NULL.getCode(),
getOnDiskSizeWithoutHeader());
}
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader;
import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexChunk;
import org.apache.hadoop.hbase.util.Bytes;
@ -110,7 +111,7 @@ public class TestHFileBlockIndex {
// This test requires at least HFile format version 2.
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
fs = FileSystem.get(conf);
fs = HFileSystem.get(conf);
}
@Test
@ -215,7 +216,8 @@ public class TestHFileBlockIndex {
private void writeWholeIndex() throws IOException {
assertEquals(0, keys.size());
HFileBlock.Writer hbw = new HFileBlock.Writer(compr, null,
includesMemstoreTS);
includesMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE,
HFile.DEFAULT_BYTES_PER_CHECKSUM);
FSDataOutputStream outputStream = fs.create(path);
HFileBlockIndex.BlockIndexWriter biw =
new HFileBlockIndex.BlockIndexWriter(hbw, null, null);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.RedundantKVGenerator;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.Before;
@ -123,12 +124,14 @@ public class TestHFileDataBlockEncoder {
HFileBlock block = getSampleHFileBlock();
Pair<ByteBuffer, BlockType> result =
blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(),
includesMemstoreTS);
includesMemstoreTS, HFileBlock.DUMMY_HEADER);
int size = result.getFirst().limit() - HFileBlock.HEADER_SIZE;
HFileBlock blockOnDisk = new HFileBlock(result.getSecond(),
size, size, -1, result.getFirst(), HFileBlock.FILL_HEADER, 0,
includesMemstoreTS);
includesMemstoreTS, block.getMinorVersion(),
block.getBytesPerChecksum(), block.getChecksumType(),
block.getOnDiskDataSizeWithHeader());
if (blockEncoder.getEncodingOnDisk() !=
DataBlockEncoding.NONE) {
@ -158,7 +161,8 @@ public class TestHFileDataBlockEncoder {
keyValues.rewind();
buf.put(keyValues);
HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, 0, includesMemstoreTS);
HFileBlock.FILL_HEADER, 0, includesMemstoreTS,
HFileReaderV2.MAX_MINOR_VERSION, 0, ChecksumType.NULL.getCode(), 0);
UNKNOWN_TABLE_AND_CF.passSchemaMetricsTo(b);
return b;
}

View File

@ -76,7 +76,7 @@ public class TestHFileReaderV1 {
assertEquals(N, reader.getEntries());
assertEquals(N, trailer.getEntryCount());
assertEquals(1, trailer.getVersion());
assertEquals(1, trailer.getMajorVersion());
assertEquals(Compression.Algorithm.GZ, trailer.getCompressionCodec());
for (boolean pread : new boolean[] { false, true }) {

View File

@ -123,7 +123,7 @@ public class TestHFileWriterV2 {
FixedFileTrailer trailer =
FixedFileTrailer.readFromStream(fsdis, fileSize);
assertEquals(2, trailer.getVersion());
assertEquals(2, trailer.getMajorVersion());
assertEquals(ENTRY_COUNT, trailer.getEntryCount());
HFileBlock.FSReader blockReader =

View File

@ -189,6 +189,8 @@ public class CreateRandomStoreFile {
.withCompression(compr)
.withBloomType(bloomType)
.withMaxKeyCount(numKV)
.withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
.withBytesPerChecksum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
.build();
rand = new Random();

View File

@ -356,6 +356,8 @@ public class HFileReadWriteTest {
.withDataBlockEncoder(dataBlockEncoder)
.withBloomType(bloomType)
.withMaxKeyCount(maxKeyCount)
.withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
.withBytesPerChecksum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
.build();
StatisticsPrinter statsPrinter = new StatisticsPrinter();

View File

@ -298,6 +298,8 @@ public class TestCompoundBloomFilter {
BLOCK_SIZES[t])
.withOutputDir(TEST_UTIL.getDataTestDir())
.withBloomType(bt)
.withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
.withBytesPerChecksum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
.build();
assertTrue(w.hasGeneralBloom());

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
@ -69,10 +70,12 @@ public class TestFSErrorsExposed {
Path hfilePath = new Path(new Path(
util.getDataTestDir("internalScannerExposesErrors"),
"regionname"), "familyname");
FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem());
HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
FileSystem fs = new HFileSystem(faultyfs);
CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
StoreFile.Writer writer = new StoreFile.WriterBuilder(
util.getConfiguration(), cacheConf, fs, 2*1024)
util.getConfiguration(), cacheConf, hfs, 2*1024)
.withOutputDir(hfilePath)
.build();
TestStoreFile.writeStoreFile(
@ -85,14 +88,14 @@ public class TestFSErrorsExposed {
StoreFile.Reader reader = sf.createReader();
HFileScanner scanner = reader.getScanner(false, true);
FaultyInputStream inStream = fs.inStreams.get(0).get();
FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
assertNotNull(inStream);
scanner.seekTo();
// Do at least one successful read
assertTrue(scanner.next());
inStream.startFaults();
faultyfs.startFaults();
try {
int scanned=0;
@ -116,10 +119,12 @@ public class TestFSErrorsExposed {
Path hfilePath = new Path(new Path(
util.getDataTestDir("internalScannerExposesErrors"),
"regionname"), "familyname");
FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem());
HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
HFileSystem fs = new HFileSystem(faultyfs);
CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
StoreFile.Writer writer = new StoreFile.WriterBuilder(
util.getConfiguration(), cacheConf, fs, 2 * 1024)
util.getConfiguration(), cacheConf, hfs, 2 * 1024)
.withOutputDir(hfilePath)
.build();
TestStoreFile.writeStoreFile(
@ -132,14 +137,13 @@ public class TestFSErrorsExposed {
Collections.singletonList(sf), false, true, false);
KeyValueScanner scanner = scanners.get(0);
FaultyInputStream inStream = fs.inStreams.get(0).get();
FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
assertNotNull(inStream);
scanner.seek(KeyValue.LOWESTKEY);
// Do at least one successful read
assertNotNull(scanner.next());
inStream.startFaults();
faultyfs.startFaults();
try {
int scanned=0;
@ -220,6 +224,15 @@ public class TestFSErrorsExposed {
inStreams.add(new SoftReference<FaultyInputStream>(faulty));
return faulty;
}
/**
* Starts to simulate faults on all streams opened so far
*/
public void startFaults() {
for (SoftReference<FaultyInputStream> is: inStreams) {
is.get().startFaults();
}
}
}
static class FaultyInputStream extends FSDataInputStream {

View File

@ -3173,7 +3173,7 @@ public class TestHRegion extends HBaseTestCase {
// set up a cluster with 3 nodes
MiniHBaseCluster cluster;
MiniHBaseCluster cluster = null;
String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
int regionServersCount = 3;
@ -3221,7 +3221,9 @@ public class TestHRegion extends HBaseTestCase {
ht.close();
} finally {
htu.shutdownMiniCluster();
if (cluster != null) {
htu.shutdownMiniCluster();
}
}
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@ -69,6 +70,9 @@ public class TestStoreFile extends HBaseTestCase {
private String ROOT_DIR;
private Map<String, Long> startingMetrics;
private static final ChecksumType CKTYPE = ChecksumType.CRC32;
private static final int CKBYTES = 512;
@Override
public void setUp() throws Exception {
super.setUp();
@ -401,6 +405,8 @@ public class TestStoreFile extends HBaseTestCase {
.withFilePath(f)
.withBloomType(StoreFile.BloomType.ROW)
.withMaxKeyCount(2000)
.withChecksumType(CKTYPE)
.withBytesPerChecksum(CKBYTES)
.build();
bloomWriteRead(writer, fs);
}
@ -420,6 +426,8 @@ public class TestStoreFile extends HBaseTestCase {
fs, StoreFile.DEFAULT_BLOCKSIZE_SMALL)
.withFilePath(f)
.withMaxKeyCount(2000)
.withChecksumType(CKTYPE)
.withBytesPerChecksum(CKBYTES)
.build();
// add delete family
@ -490,6 +498,8 @@ public class TestStoreFile extends HBaseTestCase {
.withFilePath(f)
.withBloomType(bt[x])
.withMaxKeyCount(expKeys[x])
.withChecksumType(CKTYPE)
.withBytesPerChecksum(CKBYTES)
.build();
long now = System.currentTimeMillis();
@ -565,6 +575,8 @@ public class TestStoreFile extends HBaseTestCase {
.withFilePath(f)
.withBloomType(StoreFile.BloomType.ROW)
.withMaxKeyCount(2000)
.withChecksumType(CKTYPE)
.withBytesPerChecksum(CKBYTES)
.build();
assertFalse(writer.hasGeneralBloom());
writer.close();
@ -592,6 +604,8 @@ public class TestStoreFile extends HBaseTestCase {
.withFilePath(f)
.withBloomType(StoreFile.BloomType.ROW)
.withMaxKeyCount(Integer.MAX_VALUE)
.withChecksumType(CKTYPE)
.withBytesPerChecksum(CKBYTES)
.build();
assertFalse(writer.hasGeneralBloom());
writer.close();
@ -859,6 +873,8 @@ public class TestStoreFile extends HBaseTestCase {
blockSize)
.withFilePath(path)
.withMaxKeyCount(2000)
.withChecksumType(CKTYPE)
.withBytesPerChecksum(CKBYTES)
.build();
// We'll write N-1 KVs to ensure we don't write an extra block
kvs.remove(kvs.size()-1);
@ -890,6 +906,8 @@ public class TestStoreFile extends HBaseTestCase {
.withFilePath(path)
.withDataBlockEncoder(dataBlockEncoder)
.withMaxKeyCount(2000)
.withChecksumType(CKTYPE)
.withBytesPerChecksum(CKBYTES)
.build();
writer.close();

View File

@ -133,7 +133,8 @@ public class TestCloseRegionHandler {
@Test public void testZKClosingNodeVersionMismatch()
throws IOException, NodeExistsException, KeeperException {
final Server server = new MockServer(HTU);
final RegionServerServices rss = new MockRegionServerServices();
final MockRegionServerServices rss = new MockRegionServerServices();
rss.setFileSystem(HTU.getTestFileSystem());
HTableDescriptor htd = TEST_HTD;
final HRegionInfo hri = TEST_HRI;
@ -169,7 +170,8 @@ public class TestCloseRegionHandler {
@Test public void testCloseRegion()
throws IOException, NodeExistsException, KeeperException {
final Server server = new MockServer(HTU);
final RegionServerServices rss = new MockRegionServerServices();
final MockRegionServerServices rss = new MockRegionServerServices();
rss.setFileSystem(HTU.getTestFileSystem());
HTableDescriptor htd = TEST_HTD;
HRegionInfo hri = TEST_HRI;

View File

@ -24,8 +24,10 @@ import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
@ -45,6 +47,7 @@ public class MockRegionServerServices implements RegionServerServices {
private boolean stopping = false;
private final ConcurrentSkipListMap<byte[], Boolean> rit =
new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
private HFileSystem hfs = null;
@Override
public boolean removeFromOnlineRegions(String encodedRegionName) {
@ -147,5 +150,13 @@ public class MockRegionServerServices implements RegionServerServices {
public boolean isAborted() {
return false;
}
@Override
public HFileSystem getFileSystem() {
return this.hfs;
}
public void setFileSystem(FileSystem hfs) {
this.hfs = (HFileSystem)hfs;
}
}