HBASE-11625 - Verifies data before building HFileBlock. - Adds HFileBlock.Header class which contains information about location of fields. Testing: Adds CorruptedFSReaderImpl to TestChecksum.
Change-Id: I107e9636b28abb6b15ec329e885f1e31b1b1b988 Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
9ee0cbb995
commit
48b77dbcbc
|
@ -78,46 +78,35 @@ public class ChecksumUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Validates that the data in the specified HFileBlock matches the
|
||||
* checksum. Generates the checksum for the data and
|
||||
* then validate that it matches the value stored in the header.
|
||||
* If there is a checksum mismatch, then return false. Otherwise
|
||||
* return true.
|
||||
* The header is extracted from the specified HFileBlock while the
|
||||
* data-to-be-verified is extracted from 'data'.
|
||||
* Validates that the data in the specified HFileBlock matches the checksum. Generates the
|
||||
* checksums for the data and then validate that it matches those stored in the end of the data.
|
||||
* @param buffer Contains the data in following order: HFileBlock header, data, checksums.
|
||||
* @param pathName Path of the HFile to which the {@code data} belongs. Only used for logging.
|
||||
* @param offset offset of the data being validated. Only used for logging.
|
||||
* @param hdrSize Size of the block header in {@code data}. Only used for logging.
|
||||
* @return True if checksum matches, else false.
|
||||
*/
|
||||
static boolean validateBlockChecksum(String pathName, long offset, HFileBlock block,
|
||||
byte[] data, int hdrSize) throws IOException {
|
||||
|
||||
// If this is an older version of the block that does not have
|
||||
// checksums, then return false indicating that checksum verification
|
||||
// did not succeed. Actually, this method should never be called
|
||||
// when the minorVersion is 0, thus this is a defensive check for a
|
||||
// cannot-happen case. Since this is a cannot-happen case, it is
|
||||
// better to return false to indicate a checksum validation failure.
|
||||
if (!block.getHFileContext().isUseHBaseChecksum()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Get a checksum object based on the type of checksum that is
|
||||
// set in the HFileBlock header. A ChecksumType.NULL indicates that
|
||||
// the caller is not interested in validating checksums, so we
|
||||
// always return true.
|
||||
ChecksumType cktype = ChecksumType.codeToType(block.getChecksumType());
|
||||
static boolean validateChecksum(ByteBuffer buffer, String pathName, long offset, int hdrSize)
|
||||
throws IOException {
|
||||
// A ChecksumType.NULL indicates that the caller is not interested in validating checksums,
|
||||
// so we always return true.
|
||||
ChecksumType cktype =
|
||||
ChecksumType.codeToType(buffer.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX));
|
||||
if (cktype == ChecksumType.NULL) {
|
||||
return true; // No checksum validations needed for this block.
|
||||
}
|
||||
|
||||
// read in the stored value of the checksum size from the header.
|
||||
int bytesPerChecksum = block.getBytesPerChecksum();
|
||||
int bytesPerChecksum = buffer.getInt(HFileBlock.Header.BYTES_PER_CHECKSUM_INDEX);
|
||||
|
||||
DataChecksum dataChecksum = DataChecksum.newDataChecksum(
|
||||
cktype.getDataChecksumType(), bytesPerChecksum);
|
||||
assert dataChecksum != null;
|
||||
int sizeWithHeader = block.getOnDiskDataSizeWithHeader();
|
||||
int onDiskDataSizeWithHeader =
|
||||
buffer.getInt(HFileBlock.Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.info("dataLength=" + data.length
|
||||
+ ", sizeWithHeader=" + sizeWithHeader
|
||||
LOG.info("dataLength=" + buffer.capacity()
|
||||
+ ", sizeWithHeader=" + onDiskDataSizeWithHeader
|
||||
+ ", checksumType=" + cktype.getName()
|
||||
+ ", file=" + pathName
|
||||
+ ", offset=" + offset
|
||||
|
@ -125,8 +114,10 @@ public class ChecksumUtil {
|
|||
+ ", bytesPerChecksum=" + bytesPerChecksum);
|
||||
}
|
||||
try {
|
||||
dataChecksum.verifyChunkedSums(ByteBuffer.wrap(data, 0, sizeWithHeader),
|
||||
ByteBuffer.wrap(data, sizeWithHeader, data.length - sizeWithHeader), pathName, 0);
|
||||
ByteBuffer data = (ByteBuffer) buffer.duplicate().position(0).limit(onDiskDataSizeWithHeader);
|
||||
ByteBuffer checksums = (ByteBuffer) buffer.duplicate().position(onDiskDataSizeWithHeader)
|
||||
.limit(buffer.capacity());
|
||||
dataChecksum.verifyChunkedSums(data, checksums, pathName, 0);
|
||||
} catch (ChecksumException e) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -276,6 +276,26 @@ public class HFileBlock implements Cacheable {
|
|||
CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
|
||||
}
|
||||
|
||||
// Todo: encapsulate Header related logic in this inner class.
|
||||
static class Header {
|
||||
// Format of header is:
|
||||
// 8 bytes - block magic
|
||||
// 4 bytes int - onDiskSizeWithoutHeader
|
||||
// 4 bytes int - uncompressedSizeWithoutHeader
|
||||
// 8 bytes long - prevBlockOffset
|
||||
// The following 3 are only present if header contains checksum information
|
||||
// 1 byte - checksum type
|
||||
// 4 byte int - bytes per checksum
|
||||
// 4 byte int - onDiskDataSizeWithHeader
|
||||
static int BLOCK_MAGIC_INDEX = 0;
|
||||
static int ON_DISK_SIZE_WITHOUT_HEADER_INDEX = 8;
|
||||
static int UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX = 12;
|
||||
static int PREV_BLOCK_OFFSET_INDEX = 16;
|
||||
static int CHECKSUM_TYPE_INDEX = 24;
|
||||
static int BYTES_PER_CHECKSUM_INDEX = 25;
|
||||
static int ON_DISK_DATA_SIZE_WITH_HEADER_INDEX = 29;
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy constructor. Creates a shallow copy of {@code that}'s buffer.
|
||||
*/
|
||||
|
@ -305,7 +325,7 @@ public class HFileBlock implements Cacheable {
|
|||
* @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
|
||||
* @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
|
||||
* @param prevBlockOffset see {@link #prevBlockOffset}
|
||||
* @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
|
||||
* @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
|
||||
* uncompressed data.
|
||||
* @param fillHeader when true, write the first 4 header fields into passed buffer.
|
||||
* @param offset the file offset the block was read from
|
||||
|
@ -335,12 +355,13 @@ public class HFileBlock implements Cacheable {
|
|||
final int nextBlockOnDiskSize, HFileContext fileContext) throws IOException {
|
||||
buf.rewind();
|
||||
final BlockType blockType = BlockType.read(buf);
|
||||
final int onDiskSizeWithoutHeader = buf.getInt();
|
||||
final int uncompressedSizeWithoutHeader = buf.getInt();
|
||||
final long prevBlockOffset = buf.getLong();
|
||||
byte checksumType = buf.get();
|
||||
int bytesPerChecksum = buf.getInt();
|
||||
int onDiskDataSizeWithHeader = buf.getInt();
|
||||
final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
|
||||
final int uncompressedSizeWithoutHeader =
|
||||
buf.getInt(Header.UNCOMPRESSED_SIZE_WITHOUT_HEADER_INDEX);
|
||||
final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
|
||||
byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
|
||||
int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
|
||||
int onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
|
||||
// This constructor is called when we deserialize a block from cache and when we read a block in
|
||||
// from the fs. fileCache is null when deserialized from cache so need to make up one.
|
||||
HFileContextBuilder fileContextBuilder = fileContext != null?
|
||||
|
@ -384,14 +405,13 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Parse total ondisk size including header and checksum. Its second field in header after
|
||||
* the magic bytes.
|
||||
* Parse total ondisk size including header and checksum.
|
||||
* @param headerBuf Header ByteBuffer. Presumed exact size of header.
|
||||
* @return Size of the block with header included.
|
||||
*/
|
||||
private static int getOnDiskSizeWithHeader(final ByteBuffer headerBuf) {
|
||||
// Set hbase checksum to true always calling headerSize.
|
||||
return headerBuf.getInt(BlockType.MAGIC_LENGTH) + headerSize(true);
|
||||
return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1459,8 +1479,7 @@ public class HFileBlock implements Cacheable {
|
|||
* @throws IOException
|
||||
*/
|
||||
protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size,
|
||||
boolean peekIntoNextBlock, long fileOffset, boolean pread)
|
||||
throws IOException {
|
||||
boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
|
||||
if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
|
||||
// We are asked to read the next block's header as well, but there is
|
||||
// not enough room in the array.
|
||||
|
@ -1647,9 +1666,8 @@ public class HFileBlock implements Cacheable {
|
|||
* If HBase checksum is switched off, then use HDFS checksum.
|
||||
* @return the HFileBlock or null if there is a HBase checksum mismatch
|
||||
*/
|
||||
private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
|
||||
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum)
|
||||
throws IOException {
|
||||
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
|
||||
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum) throws IOException {
|
||||
if (offset < 0) {
|
||||
throw new IOException("Invalid offset=" + offset + " trying to read "
|
||||
+ "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
|
||||
|
@ -1691,20 +1709,23 @@ public class HFileBlock implements Cacheable {
|
|||
// Do a few checks before we go instantiate HFileBlock.
|
||||
assert onDiskSizeWithHeader > this.hdrSize;
|
||||
verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset);
|
||||
ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader);
|
||||
// Verify checksum of the data before using it for building HFileBlock.
|
||||
if (verifyChecksum &&
|
||||
!validateChecksum(offset, onDiskBlockByteBuffer.asReadOnlyBuffer(), hdrSize)) {
|
||||
return null;
|
||||
}
|
||||
// The onDiskBlock will become the headerAndDataBuffer for this block.
|
||||
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
|
||||
// contains the header of next block, so no need to set next block's header in it.
|
||||
HFileBlock hFileBlock =
|
||||
new HFileBlock(new SingleByteBuff(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader)),
|
||||
new HFileBlock(new SingleByteBuff(onDiskBlockByteBuffer),
|
||||
this.fileContext.isUseHBaseChecksum(), MemoryType.EXCLUSIVE, offset,
|
||||
nextBlockOnDiskSize, fileContext);
|
||||
// Run check on uncompressed sizings.
|
||||
if (!fileContext.isCompressedOrEncrypted()) {
|
||||
hFileBlock.sanityCheckUncompressed();
|
||||
}
|
||||
if (verifyChecksum && !validateBlockChecksum(hFileBlock, offset, onDiskBlock, hdrSize)) {
|
||||
return null;
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Read " + hFileBlock);
|
||||
}
|
||||
|
@ -1737,15 +1758,21 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Generates the checksum for the header as well as the data and
|
||||
* then validates that it matches the value stored in the header.
|
||||
* If there is a checksum mismatch, then return false. Otherwise
|
||||
* return true.
|
||||
* Generates the checksum for the header as well as the data and then validates it.
|
||||
* If the block doesn't uses checksum, returns false.
|
||||
* @return True if checksum matches, else false.
|
||||
*/
|
||||
protected boolean validateBlockChecksum(HFileBlock block, long offset, byte[] data,
|
||||
int hdrSize)
|
||||
throws IOException {
|
||||
return ChecksumUtil.validateBlockChecksum(pathName, offset, block, data, hdrSize);
|
||||
protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
|
||||
throws IOException {
|
||||
// If this is an older version of the block that does not have checksums, then return false
|
||||
// indicating that checksum verification did not succeed. Actually, this method should never
|
||||
// be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
|
||||
// case. Since this is a cannot-happen case, it is better to return false to indicate a
|
||||
// checksum validation failure.
|
||||
if (!fileContext.isUseHBaseChecksum()) {
|
||||
return false;
|
||||
}
|
||||
return ChecksumUtil.validateChecksum(data, pathName, offset, hdrSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -108,9 +108,9 @@ public class TestChecksum {
|
|||
ChecksumType cktype = itr.next();
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName());
|
||||
FSDataOutputStream os = fs.create(path);
|
||||
HFileContext meta = new HFileContextBuilder().
|
||||
withChecksumType(cktype).
|
||||
build();
|
||||
HFileContext meta = new HFileContextBuilder()
|
||||
.withChecksumType(cktype)
|
||||
.build();
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
|
||||
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
|
||||
for (int i = 0; i < 1000; ++i) {
|
||||
|
@ -189,7 +189,7 @@ public class TestChecksum {
|
|||
.withIncludesTags(useTags)
|
||||
.withHBaseCheckSum(true)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr = new FSReaderImplTest(is, totalSize, fs, path, meta);
|
||||
HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(is, totalSize, fs, path, meta);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread);
|
||||
b.sanityCheck();
|
||||
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
|
||||
|
@ -231,7 +231,7 @@ public class TestChecksum {
|
|||
HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
|
||||
assertEquals(false, newfs.useHBaseChecksum());
|
||||
is = new FSDataInputStreamWrapper(newfs, path);
|
||||
hbr = new FSReaderImplTest(is, totalSize, newfs, path, meta);
|
||||
hbr = new CorruptedFSReaderImpl(is, totalSize, newfs, path, meta);
|
||||
b = hbr.readBlockData(0, -1, pread);
|
||||
is.close();
|
||||
b.sanityCheck();
|
||||
|
@ -340,20 +340,56 @@ public class TestChecksum {
|
|||
}
|
||||
|
||||
/**
|
||||
* A class that introduces hbase-checksum failures while
|
||||
* reading data from hfiles. This should trigger the hdfs level
|
||||
* checksum validations.
|
||||
* This class is to test checksum behavior when data is corrupted. It mimics the following
|
||||
* behavior:
|
||||
* - When fs checksum is disabled, hbase may get corrupted data from hdfs. If verifyChecksum
|
||||
* is true, it means hbase checksum is on and fs checksum is off, so we corrupt the data.
|
||||
* - When fs checksum is enabled, hdfs will get a different copy from another node, and will
|
||||
* always return correct data. So we don't corrupt the data when verifyChecksum for hbase is
|
||||
* off.
|
||||
*/
|
||||
static private class FSReaderImplTest extends HFileBlock.FSReaderImpl {
|
||||
public FSReaderImplTest(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs,
|
||||
static private class CorruptedFSReaderImpl extends HFileBlock.FSReaderImpl {
|
||||
/**
|
||||
* If set to true, corrupt reads using readAtOffset(...).
|
||||
*/
|
||||
boolean corruptDataStream = false;
|
||||
|
||||
public CorruptedFSReaderImpl(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs,
|
||||
Path path, HFileContext meta) throws IOException {
|
||||
super(istream, fileSize, (HFileSystem) fs, path, meta);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean validateBlockChecksum(HFileBlock block, long offset,
|
||||
byte[] data, int hdrSize) throws IOException {
|
||||
return false; // checksum validation failure
|
||||
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
|
||||
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum) throws IOException {
|
||||
if (verifyChecksum) {
|
||||
corruptDataStream = true;
|
||||
}
|
||||
HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
|
||||
verifyChecksum);
|
||||
corruptDataStream = false;
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size,
|
||||
boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
|
||||
int returnValue = super.readAtOffset(istream, dest, destOffset, size, peekIntoNextBlock,
|
||||
fileOffset, pread);
|
||||
if (!corruptDataStream) {
|
||||
return returnValue;
|
||||
}
|
||||
// Corrupt 3rd character of block magic of next block's header.
|
||||
if (peekIntoNextBlock) {
|
||||
dest[destOffset + size + 3] = 0b00000000;
|
||||
}
|
||||
// We might be reading this block's header too, corrupt it.
|
||||
dest[destOffset + 1] = 0b00000000;
|
||||
// Corrupt non header data
|
||||
if (size > hdrSize) {
|
||||
dest[destOffset + hdrSize + 1] = 0b00000000;
|
||||
}
|
||||
return returnValue;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue