HBASE-11927 Use Native Hadoop Library for HFile checksum. (Apekshit)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Apekshit(Appy) Sharma 2015-05-10 23:01:16 -07:00 committed by stack
parent fec091a807
commit 988593857f
10 changed files with 127 additions and 180 deletions

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.util.ClassSize;
public class HFileContext implements HeapSize, Cloneable { public class HFileContext implements HeapSize, Cloneable {
public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024; public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32;
/** Whether checksum is enabled or not**/ /** Whether checksum is enabled or not**/
private boolean usesHBaseChecksum = true; private boolean usesHBaseChecksum = true;
@ -48,7 +47,7 @@ public class HFileContext implements HeapSize, Cloneable {
/** Whether tags to be compressed or not**/ /** Whether tags to be compressed or not**/
private boolean compressTags; private boolean compressTags;
/** the checksum type **/ /** the checksum type **/
private ChecksumType checksumType = DEFAULT_CHECKSUM_TYPE; private ChecksumType checksumType = ChecksumType.getDefaultChecksumType();
/** the number of bytes per checksum value **/ /** the number of bytes per checksum value **/
private int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM; private int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM;
/** Number of uncompressed bytes we allow per block. */ /** Number of uncompressed bytes we allow per block. */

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
public class HFileContextBuilder { public class HFileContextBuilder {
public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024; public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32;
/** Whether checksum is enabled or not **/ /** Whether checksum is enabled or not **/
private boolean usesHBaseChecksum = true; private boolean usesHBaseChecksum = true;
@ -44,7 +43,7 @@ public class HFileContextBuilder {
/** Whether tags to be compressed or not **/ /** Whether tags to be compressed or not **/
private boolean compressTags = false; private boolean compressTags = false;
/** the checksum type **/ /** the checksum type **/
private ChecksumType checksumType = DEFAULT_CHECKSUM_TYPE; private ChecksumType checksumType = ChecksumType.getDefaultChecksumType();
/** the number of bytes per checksum value **/ /** the number of bytes per checksum value **/
private int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM; private int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM;
/** Number of uncompressed bytes we allow per block. */ /** Number of uncompressed bytes we allow per block. */

View File

@ -18,13 +18,8 @@
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.util.DataChecksum;
/** /**
* Checksum types. The Checksum type is a one byte number * Checksum types. The Checksum type is a one byte number
@ -40,112 +35,50 @@ public enum ChecksumType {
public String getName() { public String getName() {
return "NULL"; return "NULL";
} }
@Override
public void initialize() { @Override public DataChecksum.Type getDataChecksumType() {
// do nothing return DataChecksum.Type.NULL;
}
@Override
public Checksum getChecksumObject() throws IOException {
return null; // checksums not used
} }
}, },
CRC32((byte)1) { CRC32((byte)1) {
private transient Constructor<?> ctor;
@Override @Override
public String getName() { public String getName() {
return "CRC32"; return "CRC32";
} }
@Override @Override public DataChecksum.Type getDataChecksumType() {
public void initialize() { return DataChecksum.Type.CRC32;
final String PURECRC32 = "org.apache.hadoop.util.PureJavaCrc32";
final String JDKCRC = "java.util.zip.CRC32";
LOG = LogFactory.getLog(ChecksumType.class);
// check if hadoop library is available
try {
ctor = ChecksumFactory.newConstructor(PURECRC32);
LOG.debug(PURECRC32 + " available");
} catch (Exception e) {
LOG.trace(PURECRC32 + " not available.");
}
try {
// The default checksum class name is java.util.zip.CRC32.
// This is available on all JVMs.
if (ctor == null) {
ctor = ChecksumFactory.newConstructor(JDKCRC);
LOG.debug(JDKCRC + " available");
}
} catch (Exception e) {
LOG.trace(JDKCRC + " not available.");
}
}
@Override
public Checksum getChecksumObject() throws IOException {
if (ctor == null) {
throw new IOException("Bad constructor for " + getName());
}
try {
return (Checksum)ctor.newInstance();
} catch (Exception e) {
throw new IOException(e);
}
} }
}, },
CRC32C((byte)2) { CRC32C((byte)2) {
private transient Constructor<?> ctor;
@Override @Override
public String getName() { public String getName() {
return "CRC32C"; return "CRC32C";
} }
@Override @Override public DataChecksum.Type getDataChecksumType() {
public void initialize() { return DataChecksum.Type.CRC32C;
final String PURECRC32C = "org.apache.hadoop.util.PureJavaCrc32C";
LOG = LogFactory.getLog(ChecksumType.class);
try {
ctor = ChecksumFactory.newConstructor(PURECRC32C);
LOG.debug(PURECRC32C + " available");
} catch (Exception e) {
LOG.trace(PURECRC32C + " not available.");
}
}
@Override
public Checksum getChecksumObject() throws IOException {
if (ctor == null) {
throw new IOException("Bad constructor for " + getName());
}
try {
return (Checksum)ctor.newInstance();
} catch (Exception e) {
throw new IOException(e);
}
} }
}; };
private final byte code; private final byte code;
protected Log LOG;
/** initializes the relevant checksum class object */ public static ChecksumType getDefaultChecksumType() {
abstract void initialize(); return ChecksumType.CRC32C;
}
/** returns the name of this checksum type */ /** returns the name of this checksum type */
public abstract String getName(); public abstract String getName();
/** Function to get corresponding {@link org.apache.hadoop.util.DataChecksum.Type}. */
public abstract DataChecksum.Type getDataChecksumType();
private ChecksumType(final byte c) { private ChecksumType(final byte c) {
this.code = c; this.code = c;
initialize();
} }
/** returns a object that can be used to generate/validate checksums */
public abstract Checksum getChecksumObject() throws IOException;
public byte getCode() { public byte getCode() {
return this.code; return this.code;
} }

View File

@ -1312,7 +1312,7 @@ possible configurations would overwhelm and obscure the important.
</property> </property>
<property> <property>
<name>hbase.hstore.checksum.algorithm</name> <name>hbase.hstore.checksum.algorithm</name>
<value>CRC32</value> <value>CRC32C</value>
<description> <description>
Name of an algorithm that is used to compute checksums. Possible values Name of an algorithm that is used to compute checksums. Possible values
are NULL, CRC32, CRC32C. are NULL, CRC32, CRC32C.

View File

@ -20,19 +20,21 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.util.DataChecksum;
/** /**
* Utility methods to compute and validate checksums. * Utility methods to compute and validate checksums.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ChecksumUtil { public class ChecksumUtil {
public static final Log LOG = LogFactory.getLog(ChecksumUtil.class);
/** This is used to reserve space in a byte buffer */ /** This is used to reserve space in a byte buffer */
private static byte[] DUMMY_VALUE = new byte[128 * HFileBlock.CHECKSUM_SIZE]; private static byte[] DUMMY_VALUE = new byte[128 * HFileBlock.CHECKSUM_SIZE];
@ -60,33 +62,20 @@ public class ChecksumUtil {
* @param checksumType type of checksum * @param checksumType type of checksum
* @param bytesPerChecksum number of bytes per checksum value * @param bytesPerChecksum number of bytes per checksum value
*/ */
static void generateChecksums(byte[] indata, static void generateChecksums(byte[] indata, int startOffset, int endOffset,
int startOffset, int endOffset, byte[] outdata, int outOffset, ChecksumType checksumType,
byte[] outdata, int outOffset,
ChecksumType checksumType,
int bytesPerChecksum) throws IOException { int bytesPerChecksum) throws IOException {
if (checksumType == ChecksumType.NULL) { if (checksumType == ChecksumType.NULL) {
return; // No checkums for this block. return; // No checksum for this block.
} }
Checksum checksum = checksumType.getChecksumObject(); DataChecksum checksum = DataChecksum.newDataChecksum(
int bytesLeft = endOffset - startOffset; checksumType.getDataChecksumType(), bytesPerChecksum);
int chunkNum = 0;
while (bytesLeft > 0) { checksum.calculateChunkedSums(
// generate the checksum for one chunk ByteBuffer.wrap(indata, startOffset, endOffset - startOffset),
checksum.reset(); ByteBuffer.wrap(outdata, outOffset, outdata.length - outOffset));
int count = Math.min(bytesLeft, bytesPerChecksum);
checksum.update(indata, startOffset, count);
// write the checksum value to the output buffer.
int cksumValue = (int)checksum.getValue();
outOffset = Bytes.putInt(outdata, outOffset, cksumValue);
chunkNum++;
startOffset += count;
bytesLeft -= count;
}
} }
/** /**
@ -98,7 +87,7 @@ public class ChecksumUtil {
* The header is extracted from the specified HFileBlock while the * The header is extracted from the specified HFileBlock while the
* data-to-be-verified is extracted from 'data'. * data-to-be-verified is extracted from 'data'.
*/ */
static boolean validateBlockChecksum(Path path, HFileBlock block, static boolean validateBlockChecksum(Path path, HFileBlock block,
byte[] data, int hdrSize) throws IOException { byte[] data, int hdrSize) throws IOException {
// If this is an older version of the block that does not have // If this is an older version of the block that does not have
@ -117,65 +106,32 @@ public class ChecksumUtil {
// always return true. // always return true.
ChecksumType cktype = ChecksumType.codeToType(block.getChecksumType()); ChecksumType cktype = ChecksumType.codeToType(block.getChecksumType());
if (cktype == ChecksumType.NULL) { if (cktype == ChecksumType.NULL) {
return true; // No checkums validations needed for this block. return true; // No checksum validations needed for this block.
} }
Checksum checksumObject = cktype.getChecksumObject();
checksumObject.reset();
// read in the stored value of the checksum size from the header. // read in the stored value of the checksum size from the header.
int bytesPerChecksum = block.getBytesPerChecksum(); int bytesPerChecksum = block.getBytesPerChecksum();
// bytesPerChecksum is always larger than the size of the header DataChecksum dataChecksum = DataChecksum.newDataChecksum(
if (bytesPerChecksum < hdrSize) { cktype.getDataChecksumType(), bytesPerChecksum);
String msg = "Unsupported value of bytesPerChecksum. " + assert dataChecksum != null;
" Minimum is " + hdrSize + int sizeWithHeader = block.getOnDiskDataSizeWithHeader();
" but the configured value is " + bytesPerChecksum; if (LOG.isTraceEnabled()) {
HFile.LOG.warn(msg); LOG.info("length of data = " + data.length
return false; // cannot happen case, unable to verify checksum + " OnDiskDataSizeWithHeader = " + sizeWithHeader
+ " checksum type = " + cktype.getName()
+ " file =" + path.toString()
+ " header size = " + hdrSize
+ " bytesPerChecksum = " + bytesPerChecksum);
} }
// Extract the header and compute checksum for the header. try {
ByteBuffer hdr = block.getBufferWithHeader(); dataChecksum.verifyChunkedSums(ByteBuffer.wrap(data, 0, sizeWithHeader),
if (hdr.hasArray()) { ByteBuffer.wrap(data, sizeWithHeader, data.length - sizeWithHeader),
checksumObject.update(hdr.array(), hdr.arrayOffset(), hdrSize); path.toString(), 0);
} else { } catch (ChecksumException e) {
checksumObject.update(ByteBufferUtils.toBytes(hdr, 0, hdrSize), 0, hdrSize); return false;
} }
return true; // checksum is valid
int off = hdrSize;
int consumed = hdrSize;
int bytesLeft = block.getOnDiskDataSizeWithHeader() - off;
int cksumOffset = block.getOnDiskDataSizeWithHeader();
// validate each chunk
while (bytesLeft > 0) {
int thisChunkSize = bytesPerChecksum - consumed;
int count = Math.min(bytesLeft, thisChunkSize);
checksumObject.update(data, off, count);
int storedChecksum = Bytes.toInt(data, cksumOffset);
if (storedChecksum != (int)checksumObject.getValue()) {
String msg = "File " + path +
" Stored checksum value of " + storedChecksum +
" at offset " + cksumOffset +
" does not match computed checksum " +
checksumObject.getValue() +
", total data size " + data.length +
" Checksum data range offset " + off + " len " + count +
HFileBlock.toStringHeader(block.getBufferReadOnly());
HFile.LOG.warn(msg);
if (generateExceptions) {
throw new IOException(msg); // this is only for unit tests
} else {
return false; // checksum validation failure
}
}
cksumOffset += HFileBlock.CHECKSUM_SIZE;
bytesLeft -= count;
off += count;
consumed = 0;
checksumObject.reset();
}
return true; // checksum is valid
} }
/** /**

View File

@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HFileProtos; import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
@ -179,9 +178,6 @@ public class HFile {
* The number of bytes per checksum. * The number of bytes per checksum.
*/ */
public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024; public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
// TODO: This define is done in three places. Fix.
public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32;
// For measuring number of checksum failures // For measuring number of checksum failures
static final AtomicLong checksumFailures = new AtomicLong(); static final AtomicLong checksumFailures = new AtomicLong();

View File

@ -433,7 +433,7 @@ public class HStore implements Store {
public static ChecksumType getChecksumType(Configuration conf) { public static ChecksumType getChecksumType(Configuration conf) {
String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME); String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
if (checksumName == null) { if (checksumName == null) {
return HFile.DEFAULT_CHECKSUM_TYPE; return ChecksumType.getDefaultChecksumType();
} else { } else {
return ChecksumType.nameToType(checksumName); return ChecksumType.nameToType(checksumName);
} }

View File

@ -22,12 +22,18 @@ package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -68,6 +74,73 @@ public class TestChecksum {
hfs = (HFileSystem)fs; hfs = (HFileSystem)fs;
} }
@Test
public void testNewBlocksHaveDefaultChecksum() throws IOException {
Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum");
FSDataOutputStream os = fs.create(path);
HFileContext meta = new HFileContextBuilder().build();
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < 1000; ++i)
dos.writeInt(i);
hbw.writeHeaderAndData(os);
int totalSize = hbw.getOnDiskSizeWithHeader();
os.close();
// Use hbase checksums.
assertEquals(true, hfs.useHBaseChecksum());
FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
is, totalSize, (HFileSystem) fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, -1, false);
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
}
/**
* Test all checksum types by writing and reading back blocks.
*/
@Test
public void testAllChecksumTypes() throws IOException {
List<ChecksumType> cktypes = new ArrayList<>(Arrays.asList(ChecksumType.values()));
for (Iterator<ChecksumType> itr = cktypes.iterator(); itr.hasNext(); ) {
ChecksumType cktype = itr.next();
Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName());
FSDataOutputStream os = fs.create(path);
HFileContext meta = new HFileContextBuilder()
.withChecksumType(cktype).build();
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < 1000; ++i)
dos.writeInt(i);
hbw.writeHeaderAndData(os);
int totalSize = hbw.getOnDiskSizeWithHeader();
os.close();
// Use hbase checksums.
assertEquals(true, hfs.useHBaseChecksum());
FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
is, totalSize, (HFileSystem) fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, -1, false);
ByteBuffer data = b.getBufferWithoutHeader();
for (int i = 0; i < 1000; i++) {
assertEquals(i, data.getInt());
}
boolean exception_thrown = false;
try {
data.getInt();
} catch (BufferUnderflowException e) {
exception_thrown = true;
}
assertTrue(exception_thrown);
assertEquals(0, HFile.getChecksumFailuresCount());
}
}
/** /**
* Introduce checksum failures and check that we can still read * Introduce checksum failures and check that we can still read
* the data * the data
@ -256,16 +329,6 @@ public class TestChecksum {
} }
} }
/**
* Test to ensure that these is at least one valid checksum implementation
*/
@Test
public void testChecksumAlgorithm() throws IOException {
ChecksumType type = ChecksumType.CRC32;
assertEquals(ChecksumType.nameToType(type.getName()), type);
assertEquals(ChecksumType.valueOf(type.toString()), type);
}
private void validateData(DataInputStream in) throws IOException { private void validateData(DataInputStream in) throws IOException {
// validate data // validate data
for (int i = 0; i < 1234; i++) { for (int i = 0; i < 1234; i++) {

View File

@ -250,7 +250,8 @@ public class TestHFileBlock {
final String correctTestBlockStr = final String correctTestBlockStr =
"DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF" "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
+ "\\xFF\\xFF\\xFF\\xFF" + "\\xFF\\xFF\\xFF\\xFF"
+ "\\x01\\x00\\x00@\\x00\\x00\\x00\\x00[" + "\\x0" + ChecksumType.getDefaultChecksumType().getCode()
+ "\\x00\\x00@\\x00\\x00\\x00\\x00["
// gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
+ "\\x1F\\x8B" // gzip magic signature + "\\x1F\\x8B" // gzip magic signature
+ "\\x08" // Compression method: 8 = "deflate" + "\\x08" // Compression method: 8 = "deflate"

View File

@ -77,7 +77,7 @@ public class TestStoreFile extends HBaseTestCase {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); private CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestStoreFile").toString(); private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestStoreFile").toString();
private static final ChecksumType CKTYPE = ChecksumType.CRC32; private static final ChecksumType CKTYPE = ChecksumType.CRC32C;
private static final int CKBYTES = 512; private static final int CKBYTES = 512;
private static String TEST_FAMILY = "cf"; private static String TEST_FAMILY = "cf";