From 259f8610ca52cb3bc1215b0e5d8893b5645d247e Mon Sep 17 00:00:00 2001 From: huzheng Date: Mon, 18 Feb 2019 17:12:23 +0800 Subject: [PATCH] HBASE-21917 Make the HFileBlock#validateChecksum can accept ByteBuff as an input. --- .../hadoop/hbase/io/hfile/ChecksumUtil.java | 153 ++++++++++++------ .../hadoop/hbase/io/hfile/HFileBlock.java | 14 +- .../hadoop/hbase/io/hfile/TestChecksum.java | 64 +++++--- 3 files changed, 151 insertions(+), 80 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java index 5eb182640fa..5317f0e3a63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java @@ -17,11 +17,12 @@ */ package org.apache.hadoop.hbase.io.hfile; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +36,7 @@ import org.apache.hadoop.util.DataChecksum; public class ChecksumUtil { public static final Logger LOG = LoggerFactory.getLogger(ChecksumUtil.class); - /** This is used to reserve space in a byte buffer */ - private static byte[] DUMMY_VALUE = new byte[128 * HFileBlock.CHECKSUM_SIZE]; + public static final int CHECKSUM_BUF_SIZE = 256; /** * This is used by unit tests to make checksum failures throw an @@ -77,51 +77,119 @@ public class ChecksumUtil { ByteBuffer.wrap(outdata, outOffset, outdata.length - outOffset)); } + /** + * Like the hadoop's {@link DataChecksum#verifyChunkedSums(ByteBuffer, ByteBuffer, String, long)}, + * this method will also verify checksum of each chunk in data. the difference is: this method can + * accept {@link ByteBuff} as arguments, we can not add it in hadoop-common so defined here. + * @param dataChecksum to calculate the checksum. + * @param data as the input + * @param checksums to compare + * @param pathName indicate that the data is read from which file. + * @return a flag indicate the checksum match or mismatch. + * @see org.apache.hadoop.util.DataChecksum#verifyChunkedSums(ByteBuffer, ByteBuffer, String, + * long) + */ + private static boolean verifyChunkedSums(DataChecksum dataChecksum, ByteBuff data, + ByteBuff checksums, String pathName) { + // Almost all of the HFile Block are about 64KB, so it would be a SingleByteBuff, use the + // Hadoop's verify checksum directly, because it'll use the native checksum, which has no extra + // byte[] allocation or copying. (HBASE-21917) + if (data instanceof SingleByteBuff && checksums instanceof SingleByteBuff) { + // the checksums ByteBuff must also be an SingleByteBuff because it's duplicated from data. + ByteBuffer dataBB = (ByteBuffer) (data.nioByteBuffers()[0]).duplicate() + .position(data.position()).limit(data.limit()); + ByteBuffer checksumBB = (ByteBuffer) (checksums.nioByteBuffers()[0]).duplicate() + .position(checksums.position()).limit(checksums.limit()); + try { + dataChecksum.verifyChunkedSums(dataBB, checksumBB, pathName, 0); + return true; + } catch (ChecksumException e) { + return false; + } + } + + // Only when the dataBlock is larger than 4MB (default buffer size in BucketCache), the block + // will be an MultiByteBuff. we use a small byte[] to update the checksum many times for + // reducing GC pressure. it's a rare case. + int checksumTypeSize = dataChecksum.getChecksumType().size; + if (checksumTypeSize == 0) { + return true; + } + // we have 5 checksum type now: NULL,DEFAULT,MIXED,CRC32,CRC32C. the former three need 0 byte, + // and the other two need 4 bytes. + assert checksumTypeSize == 4; + + int bytesPerChecksum = dataChecksum.getBytesPerChecksum(); + int startDataPos = data.position(); + data.mark(); + checksums.mark(); + try { + // allocate an small buffer for reducing young GC (HBASE-21917), and copy 256 bytes from + // ByteBuff to update the checksum each time. if we upgrade to an future JDK and hadoop + // version which support DataCheckSum#update(ByteBuffer), we won't need to update the checksum + // multiple times then. + byte[] buf = new byte[CHECKSUM_BUF_SIZE]; + byte[] sum = new byte[checksumTypeSize]; + while (data.remaining() > 0) { + int n = Math.min(data.remaining(), bytesPerChecksum); + checksums.get(sum); + dataChecksum.reset(); + for (int remain = n, len; remain > 0; remain -= len) { + // Copy 256 bytes from ByteBuff to update the checksum each time, if the remaining + // bytes is less than 256, then just update the remaining bytes. + len = Math.min(CHECKSUM_BUF_SIZE, remain); + data.get(buf, 0, len); + dataChecksum.update(buf, 0, len); + } + int calculated = (int) dataChecksum.getValue(); + int stored = (sum[0] << 24 & 0xff000000) | (sum[1] << 16 & 0xff0000) + | (sum[2] << 8 & 0xff00) | (sum[3] & 0xff); + if (calculated != stored) { + if (LOG.isTraceEnabled()) { + long errPos = data.position() - startDataPos - n; + LOG.trace("Checksum error: {} at {} expected: {} got: {}", pathName, errPos, stored, + calculated); + } + return false; + } + } + } finally { + data.reset(); + checksums.reset(); + } + return true; + } + /** * Validates that the data in the specified HFileBlock matches the checksum. Generates the * checksums for the data and then validate that it matches those stored in the end of the data. - * @param buffer Contains the data in following order: HFileBlock header, data, checksums. + * @param buf Contains the data in following order: HFileBlock header, data, checksums. * @param pathName Path of the HFile to which the {@code data} belongs. Only used for logging. * @param offset offset of the data being validated. Only used for logging. * @param hdrSize Size of the block header in {@code data}. Only used for logging. * @return True if checksum matches, else false. */ - static boolean validateChecksum(ByteBuffer buffer, String pathName, long offset, int hdrSize) - throws IOException { - // A ChecksumType.NULL indicates that the caller is not interested in validating checksums, - // so we always return true. - ChecksumType cktype = - ChecksumType.codeToType(buffer.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX)); - if (cktype == ChecksumType.NULL) { - return true; // No checksum validations needed for this block. + static boolean validateChecksum(ByteBuff buf, String pathName, long offset, int hdrSize) { + ChecksumType ctype = ChecksumType.codeToType(buf.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX)); + if (ctype == ChecksumType.NULL) { + return true;// No checksum validations needed for this block. } // read in the stored value of the checksum size from the header. - int bytesPerChecksum = buffer.getInt(HFileBlock.Header.BYTES_PER_CHECKSUM_INDEX); - - DataChecksum dataChecksum = DataChecksum.newDataChecksum( - cktype.getDataChecksumType(), bytesPerChecksum); + int bytesPerChecksum = buf.getInt(HFileBlock.Header.BYTES_PER_CHECKSUM_INDEX); + DataChecksum dataChecksum = + DataChecksum.newDataChecksum(ctype.getDataChecksumType(), bytesPerChecksum); assert dataChecksum != null; int onDiskDataSizeWithHeader = - buffer.getInt(HFileBlock.Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); + buf.getInt(HFileBlock.Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); if (LOG.isTraceEnabled()) { - LOG.info("dataLength=" + buffer.capacity() - + ", sizeWithHeader=" + onDiskDataSizeWithHeader - + ", checksumType=" + cktype.getName() - + ", file=" + pathName - + ", offset=" + offset - + ", headerSize=" + hdrSize - + ", bytesPerChecksum=" + bytesPerChecksum); + LOG.info("dataLength=" + buf.capacity() + ", sizeWithHeader=" + onDiskDataSizeWithHeader + + ", checksumType=" + ctype.getName() + ", file=" + pathName + ", offset=" + offset + + ", headerSize=" + hdrSize + ", bytesPerChecksum=" + bytesPerChecksum); } - try { - ByteBuffer data = (ByteBuffer) buffer.duplicate().position(0).limit(onDiskDataSizeWithHeader); - ByteBuffer checksums = (ByteBuffer) buffer.duplicate().position(onDiskDataSizeWithHeader) - .limit(buffer.capacity()); - dataChecksum.verifyChunkedSums(data, checksums, pathName, 0); - } catch (ChecksumException e) { - return false; - } - return true; // checksum is valid + ByteBuff data = buf.duplicate().position(0).limit(onDiskDataSizeWithHeader); + ByteBuff checksums = buf.duplicate().position(onDiskDataSizeWithHeader).limit(buf.limit()); + return verifyChunkedSums(dataChecksum, data, checksums, pathName); } /** @@ -150,25 +218,6 @@ public class ChecksumUtil { return numChunks; } - /** - * Write dummy checksums to the end of the specified bytes array - * to reserve space for writing checksums later - * @param baos OutputStream to write dummy checkum values - * @param numBytes Number of bytes of data for which dummy checksums - * need to be generated - * @param bytesPerChecksum Number of bytes per checksum value - */ - static void reserveSpaceForChecksums(ByteArrayOutputStream baos, - int numBytes, int bytesPerChecksum) throws IOException { - long numChunks = numChunks(numBytes, bytesPerChecksum); - long bytesLeft = numChunks * HFileBlock.CHECKSUM_SIZE; - while (bytesLeft > 0) { - long count = Math.min(bytesLeft, DUMMY_VALUE.length); - baos.write(DUMMY_VALUE, 0, (int)count); - bytesLeft -= count; - } - } - /** * Mechanism to throw an exception in case of hbase checksum * failure. This is used by unit tests only. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index f837b6bcd98..2b8f4b95dbb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1781,10 +1781,10 @@ public class HFileBlock implements Cacheable { // Do a few checks before we go instantiate HFileBlock. assert onDiskSizeWithHeader > this.hdrSize; verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); - ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader); + ByteBuff onDiskBlockByteBuff = + new SingleByteBuff(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader)); // Verify checksum of the data before using it for building HFileBlock. - if (verifyChecksum && - !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) { + if (verifyChecksum && !validateChecksum(offset, onDiskBlockByteBuff, hdrSize)) { return null; } long duration = System.currentTimeMillis() - startTime; @@ -1794,9 +1794,8 @@ public class HFileBlock implements Cacheable { // The onDiskBlock will become the headerAndDataBuffer for this block. // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already // contains the header of next block, so no need to set next block's header in it. - HFileBlock hFileBlock = - new HFileBlock(new SingleByteBuff(onDiskBlockByteBuffer), checksumSupport, - MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext); + HFileBlock hFileBlock = new HFileBlock(onDiskBlockByteBuff, checksumSupport, + MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext); // Run check on uncompressed sizings. if (!fileContext.isCompressedOrEncrypted()) { hFileBlock.sanityCheckUncompressed(); @@ -1835,8 +1834,7 @@ public class HFileBlock implements Cacheable { * If the block doesn't uses checksum, returns false. * @return True if checksum matches, else false. */ - private boolean validateChecksum(long offset, ByteBuffer data, int hdrSize) - throws IOException { + private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) { // If this is an older version of the block that does not have checksums, then return false // indicating that checksum verification did not succeed. Actually, this method should never // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index de28422570f..e93b61eac9d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -21,16 +21,15 @@ import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.BufferUnderflowException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; +import java.nio.ByteBuffer; + import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -42,6 +41,8 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ChecksumType; @@ -102,22 +103,35 @@ public class TestChecksum { assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode()); } - /** - * Test all checksum types by writing and reading back blocks. - */ + private void verifyMBBCheckSum(ByteBuff buf) throws IOException { + int size = buf.remaining() / 2 + 1; + ByteBuff mbb = new MultiByteBuff(ByteBuffer.allocate(size), ByteBuffer.allocate(size)) + .position(0).limit(buf.remaining()); + for (int i = buf.position(); i < buf.limit(); i++) { + mbb.put(buf.get(i)); + } + mbb.position(0).limit(buf.remaining()); + assertEquals(mbb.remaining(), buf.remaining()); + assertTrue(mbb.remaining() > size); + ChecksumUtil.validateChecksum(mbb, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM); + } + + private void verifySBBCheckSum(ByteBuff buf) throws IOException { + ChecksumUtil.validateChecksum(buf, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM); + } + @Test - public void testAllChecksumTypes() throws IOException { - List cktypes = new ArrayList<>(Arrays.asList(ChecksumType.values())); - for (Iterator itr = cktypes.iterator(); itr.hasNext(); ) { - ChecksumType cktype = itr.next(); - Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName()); + public void testVerifyCheckSum() throws IOException { + int intCount = 10000; + for (ChecksumType ckt : ChecksumType.values()) { + Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + ckt.getName()); FSDataOutputStream os = fs.create(path); HFileContext meta = new HFileContextBuilder() - .withChecksumType(cktype) - .build(); + .withChecksumType(ckt) + .build(); HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); DataOutputStream dos = hbw.startWriting(BlockType.DATA); - for (int i = 0; i < 1000; ++i) { + for (int i = 0; i < intCount; ++i) { dos.writeInt(i); } hbw.writeHeaderAndData(os); @@ -130,19 +144,25 @@ public class TestChecksum { FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl( - is, totalSize, (HFileSystem) fs, path, meta); + is, totalSize, (HFileSystem) fs, path, meta); HFileBlock b = hbr.readBlockData(0, -1, false, false); + + // verify SingleByteBuff checksum. + verifySBBCheckSum(b.getBufferReadOnly()); + + // verify MultiByteBuff checksum. + verifyMBBCheckSum(b.getBufferReadOnly()); + ByteBuff data = b.getBufferWithoutHeader(); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < intCount; i++) { assertEquals(i, data.getInt()); } - boolean exception_thrown = false; try { data.getInt(); + fail(); } catch (BufferUnderflowException e) { - exception_thrown = true; + // expected failure } - assertTrue(exception_thrown); assertEquals(0, HFile.getAndResetChecksumFailuresCount()); } } @@ -216,16 +236,19 @@ public class TestChecksum { for (int i = 0; i < HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) { b = hbr.readBlockData(0, -1, pread, false); + assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); assertEquals(0, HFile.getAndResetChecksumFailuresCount()); } // The next read should have hbase checksum verification reanabled, // we verify this by assertng that there was a hbase-checksum failure. b = hbr.readBlockData(0, -1, pread, false); + assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); assertEquals(1, HFile.getAndResetChecksumFailuresCount()); // Since the above encountered a checksum failure, we switch // back to not checking hbase checksums. b = hbr.readBlockData(0, -1, pread, false); + assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); assertEquals(0, HFile.getAndResetChecksumFailuresCount()); is.close(); @@ -319,6 +342,7 @@ public class TestChecksum { HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper( is, nochecksum), totalSize, hfs, path, meta); HFileBlock b = hbr.readBlockData(0, -1, pread, false); + assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); is.close(); b.sanityCheck(); assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());