diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 68c80fd9512..e9bc08f7d15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -493,18 +493,8 @@ public class HFileBlock implements Cacheable { * @return the buffer with header skipped and checksum omitted. */ public ByteBuff getBufferWithoutHeader() { - return this.getBufferWithoutHeader(false); - } - - /** - * Returns a buffer that does not include the header or checksum. - * @param withChecksum to indicate whether include the checksum or not. - * @return the buffer with header skipped and checksum omitted. - */ - public ByteBuff getBufferWithoutHeader(boolean withChecksum) { ByteBuff dup = getBufferReadOnly(); - int delta = withChecksum ? 0 : totalChecksumBytes(); - return dup.position(headerSize()).limit(buf.limit() - delta).slice(); + return dup.position(headerSize()).slice(); } /** @@ -568,19 +558,21 @@ public class HFileBlock implements Cacheable { sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); } - int cksumBytes = totalChecksumBytes(); - int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; - if (dup.limit() != expectedBufLimit) { - throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit()); + if (dup.limit() != onDiskDataSizeWithHeader) { + throw new AssertionError( + "Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit()); } // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next // block's header, so there are two sensible values for buffer capacity. int hdrSize = headerSize(); dup.rewind(); - if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) { + if ( + dup.remaining() != onDiskDataSizeWithHeader + && dup.remaining() != onDiskDataSizeWithHeader + hdrSize + ) { throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected " - + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); + + onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize)); } } @@ -641,12 +633,13 @@ public class HFileBlock implements Cacheable { ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); // Create a duplicated buffer without the header part. + int headerSize = this.headerSize(); ByteBuff dup = this.buf.duplicate(); - dup.position(this.headerSize()); + dup.position(headerSize); dup = dup.slice(); // Decode the dup into unpacked#buf - ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), - unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup); + ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize, + unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup); succ = true; return unpacked; } finally { @@ -661,9 +654,8 @@ public class HFileBlock implements Cacheable { * buffer. Does not change header fields. Reserve room to keep checksum bytes too. */ private ByteBuff allocateBufferForUnpacking() { - int cksumBytes = totalChecksumBytes(); int headerSize = headerSize(); - int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; + int capacityNeeded = headerSize + uncompressedSizeWithoutHeader; ByteBuff source = buf.duplicate(); ByteBuff newBuf = allocator.allocate(capacityNeeded); @@ -682,9 +674,8 @@ public class HFileBlock implements Cacheable { * calculated heuristic, not tracked attribute of the block. */ public boolean isUnpacked() { - final int cksumBytes = totalChecksumBytes(); final int headerSize = headerSize(); - final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; + final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader; final int bufCapacity = buf.remaining(); return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; } @@ -1709,6 +1700,9 @@ public class HFileBlock implements Cacheable { if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { return null; } + // remove checksum from buffer now that it's verified + int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); + curBlock.limit(sizeWithoutChecksum); long duration = EnvironmentEdgeManager.currentTime() - startTime; if (updateMetrics) { HFile.updateReadLatency(duration, pread); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index 2c597b18943..baaefb0c0f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -149,11 +149,13 @@ public class TestChecksum { HFileBlock b = hbr.readBlockData(0, -1, false, false, true); assertTrue(!b.isSharedMem()); + ByteBuff bufferWithChecksum = getBufferWithChecksum(b); + // verify SingleByteBuff checksum. - verifySBBCheckSum(b.getBufferReadOnly()); + verifySBBCheckSum(bufferWithChecksum); // verify MultiByteBuff checksum. - verifyMBBCheckSum(b.getBufferReadOnly()); + verifyMBBCheckSum(bufferWithChecksum); ByteBuff data = b.getBufferWithoutHeader(); for (int i = 0; i < intCount; i++) { @@ -169,6 +171,28 @@ public class TestChecksum { } } + /** + * HFileBlock buffer does not include checksum because it is discarded after verifying upon + * reading from disk. We artificially add a checksum onto the buffer for use in testing that + * ChecksumUtil.validateChecksum works for SingleByteBuff and MultiByteBuff in + * {@link #verifySBBCheckSum(ByteBuff)} and {@link #verifyMBBCheckSum(ByteBuff)} + */ + private ByteBuff getBufferWithChecksum(HFileBlock block) throws IOException { + ByteBuff buf = block.getBufferReadOnly(); + + int numBytes = + (int) ChecksumUtil.numBytes(buf.remaining(), block.getHFileContext().getBytesPerChecksum()); + byte[] checksum = new byte[numBytes]; + ChecksumUtil.generateChecksums(buf.array(), 0, buf.limit(), checksum, 0, + block.getHFileContext().getChecksumType(), block.getBytesPerChecksum()); + + ByteBuff bufWithChecksum = ByteBuffAllocator.HEAP.allocate(buf.limit() + numBytes); + bufWithChecksum.put(buf.array(), 0, buf.limit()); + bufWithChecksum.put(checksum); + + return bufWithChecksum.rewind(); + } + /** * Introduce checksum failures and check that we can still read the data */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 467f32a266b..33788b692b9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -652,14 +652,13 @@ public class TestHFileBlock { // verifies that the unpacked value read back off disk matches the unpacked value // generated before writing to disk. HFileBlock newBlock = b.unpack(meta, hbr); - // b's buffer has header + data + checksum while - // expectedContents have header + data only + // neither b's unpacked nor the expectedContents have checksum. + // they should be identical ByteBuff bufRead = newBlock.getBufferReadOnly(); ByteBuffer bufExpected = expectedContents.get(i); - byte[] tmp = new byte[bufRead.limit() - newBlock.totalChecksumBytes()]; - bufRead.get(tmp, 0, tmp.length); - boolean bytesAreCorrect = Bytes.compareTo(tmp, 0, tmp.length, bufExpected.array(), - bufExpected.arrayOffset(), bufExpected.limit()) == 0; + byte[] bytesRead = bufRead.toBytes(); + boolean bytesAreCorrect = Bytes.compareTo(bytesRead, 0, bytesRead.length, + bufExpected.array(), bufExpected.arrayOffset(), bufExpected.limit()) == 0; String wrongBytesMsg = ""; if (!bytesAreCorrect) { @@ -669,8 +668,7 @@ public class TestHFileBlock { + pread + ", cacheOnWrite=" + cacheOnWrite + "):\n"; wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n" - + Bytes.toStringBinary(bufRead.array(), bufRead.arrayOffset(), - Math.min(32 + 10, bufRead.limit())); + + Bytes.toStringBinary(bytesRead, 0, Math.min(32 + 10, bytesRead.length)); if (detailedLogging) { LOG.warn( "expected header" + HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java index bd005802f48..a8a32595ebd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -30,11 +33,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -71,7 +76,62 @@ public class TestHFileBlockUnpack { Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE); allocator = ByteBuffAllocator.create(conf, true); + } + /** + * It's important that if you read and unpack the same HFileBlock twice, it results in an + * identical buffer each time. Otherwise we end up with validation failures in block cache, since + * contents may not match if the same block is cached twice. See + * https://issues.apache.org/jira/browse/HBASE-27053 + */ + @Test + public void itUnpacksIdenticallyEachTime() throws IOException { + Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName()); + int totalSize = createTestBlock(path); + + // Allocate a bunch of random buffers, so we can be sure that unpack will only have "dirty" + // buffers to choose from when allocating itself. + Random random = new Random(); + byte[] temp = new byte[HConstants.DEFAULT_BLOCKSIZE]; + List buffs = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + ByteBuff buff = allocator.allocate(HConstants.DEFAULT_BLOCKSIZE); + random.nextBytes(temp); + buff.put(temp); + buffs.add(buff); + } + + buffs.forEach(ByteBuff::release); + + // read the same block twice. we should expect the underlying buffer below to + // be identical each time + HFileBlockWrapper blockOne = readBlock(path, totalSize); + HFileBlockWrapper blockTwo = readBlock(path, totalSize); + + // first check size fields + assertEquals(blockOne.original.getOnDiskSizeWithHeader(), + blockTwo.original.getOnDiskSizeWithHeader()); + assertEquals(blockOne.original.getUncompressedSizeWithoutHeader(), + blockTwo.original.getUncompressedSizeWithoutHeader()); + + // next check packed buffers + assertBuffersEqual(blockOne.original.getBufferWithoutHeader(), + blockTwo.original.getBufferWithoutHeader(), + blockOne.original.getOnDiskDataSizeWithHeader() - blockOne.original.headerSize()); + + // now check unpacked buffers. prior to HBASE-27053, this would fail because + // the unpacked buffer would include extra space for checksums at the end that was not written. + // so the checksum space would be filled with random junk when re-using pooled buffers. + assertBuffersEqual(blockOne.unpacked.getBufferWithoutHeader(), + blockTwo.unpacked.getBufferWithoutHeader(), + blockOne.original.getUncompressedSizeWithoutHeader()); + } + + private void assertBuffersEqual(ByteBuff bufferOne, ByteBuff bufferTwo, int expectedSize) { + assertEquals(expectedSize, bufferOne.limit()); + assertEquals(expectedSize, bufferTwo.limit()); + assertEquals(0, + ByteBuff.compareTo(bufferOne, 0, bufferOne.limit(), bufferTwo, 0, bufferTwo.limit())); } /** @@ -83,15 +143,66 @@ public class TestHFileBlockUnpack { */ @Test public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException { - Configuration conf = TEST_UTIL.getConfiguration(); + Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName()); + int totalSize = createTestBlock(path); + HFileBlockWrapper blockFromHFile = readBlock(path, totalSize); + + assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.original.isUnpacked()); + assertFalse("expected hfile block to NOT use shared memory", + blockFromHFile.original.isSharedMem()); + + assertTrue( + "expected generated block size " + blockFromHFile.original.getOnDiskSizeWithHeader() + + " to be less than " + MIN_ALLOCATION_SIZE, + blockFromHFile.original.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE); + assertTrue( + "expected generated block uncompressed size " + + blockFromHFile.original.getUncompressedSizeWithoutHeader() + " to be more than " + + MIN_ALLOCATION_SIZE, + blockFromHFile.original.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE); + + assertTrue("expected unpacked block to be unpacked", blockFromHFile.unpacked.isUnpacked()); + assertTrue("expected unpacked block to use shared memory", + blockFromHFile.unpacked.isSharedMem()); + } + + private final static class HFileBlockWrapper { + private final HFileBlock original; + private final HFileBlock unpacked; + + private HFileBlockWrapper(HFileBlock original, HFileBlock unpacked) { + this.original = original; + this.unpacked = unpacked; + } + } + + private HFileBlockWrapper readBlock(Path path, int totalSize) throws IOException { + try (FSDataInputStream is = fs.open(path)) { + HFileContext meta = + new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ) + .withIncludesMvcc(false).withIncludesTags(false).build(); + ReaderContext context = + new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) + .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); + HFileBlock.FSReaderImpl hbr = + new HFileBlock.FSReaderImpl(context, meta, allocator, TEST_UTIL.getConfiguration()); + hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, TEST_UTIL.getConfiguration()); + hbr.setIncludesMemStoreTS(false); + HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false); + blockFromHFile.sanityCheck(); + return new HFileBlockWrapper(blockFromHFile, blockFromHFile.unpack(meta, hbr)); + } + } + + private int createTestBlock(Path path) throws IOException { HFileContext meta = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false) .withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); - Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName()); int totalSize; try (FSDataOutputStream os = fs.create(path)) { - HFileBlock.Writer hbw = new HFileBlock.Writer(conf, NoOpDataBlockEncoder.INSTANCE, meta); + HFileBlock.Writer hbw = + new HFileBlock.Writer(TEST_UTIL.getConfiguration(), NoOpDataBlockEncoder.INSTANCE, meta); hbw.startWriting(BlockType.DATA); writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1); hbw.writeHeaderAndData(os); @@ -100,36 +211,7 @@ public class TestHFileBlockUnpack { "expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE, totalSize < MIN_ALLOCATION_SIZE); } - - try (FSDataInputStream is = fs.open(path)) { - meta = - new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ) - .withIncludesMvcc(false).withIncludesTags(false).build(); - ReaderContext context = - new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) - .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); - HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, allocator, conf); - hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, conf); - hbr.setIncludesMemStoreTS(false); - HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false); - blockFromHFile.sanityCheck(); - assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.isUnpacked()); - assertFalse("expected hfile block to NOT use shared memory", blockFromHFile.isSharedMem()); - - assertTrue( - "expected generated block size " + blockFromHFile.getOnDiskSizeWithHeader() - + " to be less than " + MIN_ALLOCATION_SIZE, - blockFromHFile.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE); - assertTrue( - "expected generated block uncompressed size " - + blockFromHFile.getUncompressedSizeWithoutHeader() + " to be more than " - + MIN_ALLOCATION_SIZE, - blockFromHFile.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE); - - HFileBlock blockUnpacked = blockFromHFile.unpack(meta, hbr); - assertTrue("expected unpacked block to be unpacked", blockUnpacked.isUnpacked()); - assertTrue("expected unpacked block to use shared memory", blockUnpacked.isSharedMem()); - } + return totalSize; } static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException {