From e4ce632d598911dd0dcabeed81aee370a273be1b Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Sat, 16 Jul 2022 17:30:55 -0400 Subject: [PATCH] HubSpot Backport: HBASE-27053 IOException during caching of uncompressed block to the block cache (#4610) Signed-off-by: Duo Zhang Reviewed-by: wenwj0 --- .../hadoop/hbase/io/hfile/HFileBlock.java | 48 ++-- .../hadoop/hbase/io/hfile/TestChecksum.java | 31 ++- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 17 +- .../hbase/io/hfile/TestHFileBlockUnpack.java | 249 ++++++++++++++++++ 4 files changed, 304 insertions(+), 41 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 07f8f81e810..97c15ded753 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -489,18 +489,8 @@ public class HFileBlock implements Cacheable { * @return the buffer with header skipped and checksum omitted. */ public ByteBuff getBufferWithoutHeader() { - return this.getBufferWithoutHeader(false); - } - - /** - * Returns a buffer that does not include the header or checksum. - * @param withChecksum to indicate whether include the checksum or not. - * @return the buffer with header skipped and checksum omitted. - */ - public ByteBuff getBufferWithoutHeader(boolean withChecksum) { ByteBuff dup = getBufferReadOnly(); - int delta = withChecksum ? 0 : totalChecksumBytes(); - return dup.position(headerSize()).limit(buf.limit() - delta).slice(); + return dup.position(headerSize()).slice(); } /** @@ -566,19 +556,21 @@ public class HFileBlock implements Cacheable { sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); } - int cksumBytes = totalChecksumBytes(); - int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; - if (dup.limit() != expectedBufLimit) { - throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit()); + if (dup.limit() != onDiskDataSizeWithHeader) { + throw new AssertionError( + "Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit()); } // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next // block's header, so there are two sensible values for buffer capacity. int hdrSize = headerSize(); dup.rewind(); - if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) { - throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + - ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); + if ( + dup.remaining() != onDiskDataSizeWithHeader + && dup.remaining() != onDiskDataSizeWithHeader + hdrSize + ) { + throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected " + + onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize)); } } @@ -636,19 +628,20 @@ public class HFileBlock implements Cacheable { return this; } - ByteBuff newBuf = allocateBuffer(); // allocates space for the decompressed block + ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block HFileBlock unpacked = shallowClone(this, newBuf); boolean succ = false; try { HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); // Create a duplicated buffer without the header part. + int headerSize = this.headerSize(); ByteBuff dup = this.buf.duplicate(); - dup.position(this.headerSize()); + dup.position(headerSize); dup = dup.slice(); // Decode the dup into unpacked#buf - ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), - unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup); + ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize, + unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup); succ = true; return unpacked; } finally { @@ -663,10 +656,9 @@ public class HFileBlock implements Cacheable { * from the existing buffer. Does not change header fields. * Reserve room to keep checksum bytes too. */ - private ByteBuff allocateBuffer() { - int cksumBytes = totalChecksumBytes(); + private ByteBuff allocateBufferForUnpacking() { int headerSize = headerSize(); - int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; + int capacityNeeded = headerSize + uncompressedSizeWithoutHeader; ByteBuff source = buf.duplicate(); ByteBuff newBuf = allocator.allocate(capacityNeeded); @@ -685,9 +677,8 @@ public class HFileBlock implements Cacheable { * calculated heuristic, not tracked attribute of the block. */ public boolean isUnpacked() { - final int cksumBytes = totalChecksumBytes(); final int headerSize = headerSize(); - final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; + final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader; final int bufCapacity = buf.remaining(); return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; } @@ -1755,6 +1746,9 @@ public class HFileBlock implements Cacheable { if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { return null; } + // remove checksum from buffer now that it's verified + int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX); + curBlock.limit(sizeWithoutChecksum); long duration = System.currentTimeMillis() - startTime; if (updateMetrics) { HFile.updateReadLatency(duration, pread); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index 85f74c90614..13352566c76 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -162,11 +162,13 @@ public class TestChecksum { HFileBlock b = hbr.readBlockData(0, -1, false, false, true); assertTrue(!b.isSharedMem()); + ByteBuff bufferWithChecksum = getBufferWithChecksum(b); + // verify SingleByteBuff checksum. - verifySBBCheckSum(b.getBufferReadOnly()); + verifySBBCheckSum(bufferWithChecksum); // verify MultiByteBuff checksum. - verifyMBBCheckSum(b.getBufferReadOnly()); + verifyMBBCheckSum(bufferWithChecksum); ByteBuff data = b.getBufferWithoutHeader(); for (int i = 0; i < intCount; i++) { @@ -183,8 +185,29 @@ public class TestChecksum { } /** - * Introduce checksum failures and check that we can still read - * the data + * HFileBlock buffer does not include checksum because it is discarded after verifying upon + * reading from disk. We artificially add a checksum onto the buffer for use in testing that + * ChecksumUtil.validateChecksum works for SingleByteBuff and MultiByteBuff in + * {@link #verifySBBCheckSum(ByteBuff)} and {@link #verifyMBBCheckSum(ByteBuff)} + */ + private ByteBuff getBufferWithChecksum(HFileBlock block) throws IOException { + ByteBuff buf = block.getBufferReadOnly(); + + int numBytes = + (int) ChecksumUtil.numBytes(buf.remaining(), block.getHFileContext().getBytesPerChecksum()); + byte[] checksum = new byte[numBytes]; + ChecksumUtil.generateChecksums(buf.array(), 0, buf.limit(), checksum, 0, + block.getHFileContext().getChecksumType(), block.getBytesPerChecksum()); + + ByteBuff bufWithChecksum = ByteBuffAllocator.HEAP.allocate(buf.limit() + numBytes); + bufWithChecksum.put(buf.array(), 0, buf.limit()); + bufWithChecksum.put(checksum); + + return bufWithChecksum.rewind(); + } + + /** + * Introduce checksum failures and check that we can still read the data */ @Test public void testChecksumCorruption() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 2da64e26b74..e1694c52fd5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -687,14 +687,13 @@ public class TestHFileBlock { // verifies that the unpacked value read back off disk matches the unpacked value // generated before writing to disk. HFileBlock newBlock = b.unpack(meta, hbr); - // b's buffer has header + data + checksum while - // expectedContents have header + data only + // neither b's unpacked nor the expectedContents have checksum. + // they should be identical ByteBuff bufRead = newBlock.getBufferReadOnly(); ByteBuffer bufExpected = expectedContents.get(i); - byte[] tmp = new byte[bufRead.limit() - newBlock.totalChecksumBytes()]; - bufRead.get(tmp, 0, tmp.length); - boolean bytesAreCorrect = Bytes.compareTo(tmp, 0, tmp.length, bufExpected.array(), - bufExpected.arrayOffset(), bufExpected.limit()) == 0; + byte[] bytesRead = bufRead.toBytes(); + boolean bytesAreCorrect = Bytes.compareTo(bytesRead, 0, bytesRead.length, + bufExpected.array(), bufExpected.arrayOffset(), bufExpected.limit()) == 0; String wrongBytesMsg = ""; if (!bytesAreCorrect) { @@ -704,10 +703,8 @@ public class TestHFileBlock { + algo + ", pread=" + pread + ", cacheOnWrite=" + cacheOnWrite + "):\n"; wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), - bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) - + ", actual:\n" - + Bytes.toStringBinary(bufRead.array(), - bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit())); + bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n" + + Bytes.toStringBinary(bytesRead, 0, Math.min(32 + 10, bytesRead.length)); if (detailedLogging) { LOG.warn("expected header" + HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java new file mode 100644 index 00000000000..a61ead7850d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockUnpack.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ IOTests.class, MediumTests.class }) +public class TestHFileBlockUnpack { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHFileBlockUnpack.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + // repetition gives us some chance to get a good compression ratio + private static float CHANCE_TO_REPEAT = 0.6f; + + private static final int MIN_ALLOCATION_SIZE = 10 * 1024; + + ByteBuffAllocator allocator; + + @Rule + public TestName name = new TestName(); + private FileSystem fs; + + @Before + public void setUp() throws Exception { + fs = HFileSystem.get(TEST_UTIL.getConfiguration()); + Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE); + allocator = ByteBuffAllocator.create(conf, true); + } + + /** + * It's important that if you read and unpack the same HFileBlock twice, it results in an + * identical buffer each time. Otherwise we end up with validation failures in block cache, since + * contents may not match if the same block is cached twice. See + * https://issues.apache.org/jira/browse/HBASE-27053 + */ + @Test + public void itUnpacksIdenticallyEachTime() throws IOException { + Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName()); + int totalSize = createTestBlock(path); + + // Allocate a bunch of random buffers, so we can be sure that unpack will only have "dirty" + // buffers to choose from when allocating itself. + Random random = new Random(); + byte[] temp = new byte[HConstants.DEFAULT_BLOCKSIZE]; + List buffs = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + ByteBuff buff = allocator.allocate(HConstants.DEFAULT_BLOCKSIZE); + random.nextBytes(temp); + buff.put(temp); + buffs.add(buff); + } + + buffs.forEach(ByteBuff::release); + + // read the same block twice. we should expect the underlying buffer below to + // be identical each time + HFileBlockWrapper blockOne = readBlock(path, totalSize); + HFileBlockWrapper blockTwo = readBlock(path, totalSize); + + // first check size fields + assertEquals(blockOne.original.getOnDiskSizeWithHeader(), + blockTwo.original.getOnDiskSizeWithHeader()); + assertEquals(blockOne.original.getUncompressedSizeWithoutHeader(), + blockTwo.original.getUncompressedSizeWithoutHeader()); + + // next check packed buffers + assertBuffersEqual(blockOne.original.getBufferWithoutHeader(), + blockTwo.original.getBufferWithoutHeader(), + blockOne.original.getOnDiskDataSizeWithHeader() - blockOne.original.headerSize()); + + // now check unpacked buffers. prior to HBASE-27053, this would fail because + // the unpacked buffer would include extra space for checksums at the end that was not written. + // so the checksum space would be filled with random junk when re-using pooled buffers. + assertBuffersEqual(blockOne.unpacked.getBufferWithoutHeader(), + blockTwo.unpacked.getBufferWithoutHeader(), + blockOne.original.getUncompressedSizeWithoutHeader()); + } + + private void assertBuffersEqual(ByteBuff bufferOne, ByteBuff bufferTwo, int expectedSize) { + assertEquals(expectedSize, bufferOne.limit()); + assertEquals(expectedSize, bufferTwo.limit()); + assertEquals(0, + ByteBuff.compareTo(bufferOne, 0, bufferOne.limit(), bufferTwo, 0, bufferTwo.limit())); + } + + /** + * If the block on disk size is less than {@link ByteBuffAllocator}'s min allocation size, that + * block will be allocated to heap regardless of desire for off-heap. After de-compressing the + * block, the new size may now exceed the min allocation size. This test ensures that those + * de-compressed blocks, which will be allocated off-heap, are properly marked as + * {@link HFileBlock#isSharedMem()} == true See https://issues.apache.org/jira/browse/HBASE-27170 + */ + @Test + public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException { + Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName()); + int totalSize = createTestBlock(path); + HFileBlockWrapper blockFromHFile = readBlock(path, totalSize); + + assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.original.isUnpacked()); + assertFalse("expected hfile block to NOT use shared memory", + blockFromHFile.original.isSharedMem()); + + assertTrue( + "expected generated block size " + blockFromHFile.original.getOnDiskSizeWithHeader() + + " to be less than " + MIN_ALLOCATION_SIZE, + blockFromHFile.original.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE); + assertTrue( + "expected generated block uncompressed size " + + blockFromHFile.original.getUncompressedSizeWithoutHeader() + " to be more than " + + MIN_ALLOCATION_SIZE, + blockFromHFile.original.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE); + + assertTrue("expected unpacked block to be unpacked", blockFromHFile.unpacked.isUnpacked()); + assertTrue("expected unpacked block to use shared memory", + blockFromHFile.unpacked.isSharedMem()); + } + + private final static class HFileBlockWrapper { + private final HFileBlock original; + private final HFileBlock unpacked; + + private HFileBlockWrapper(HFileBlock original, HFileBlock unpacked) { + this.original = original; + this.unpacked = unpacked; + } + } + + private HFileBlockWrapper readBlock(Path path, int totalSize) throws IOException { + try (FSDataInputStream is = fs.open(path)) { + HFileContext meta = + new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ) + .withIncludesMvcc(false).withIncludesTags(false).build(); + ReaderContext context = + new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) + .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); + HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, allocator); + hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE); + hbr.setIncludesMemStoreTS(false); + HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false); + blockFromHFile.sanityCheck(); + return new HFileBlockWrapper(blockFromHFile, blockFromHFile.unpack(meta, hbr)); + } + } + + private int createTestBlock(Path path) throws IOException { + HFileContext meta = + new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false) + .withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); + + int totalSize; + try (FSDataOutputStream os = fs.create(path)) { + HFileBlock.Writer hbw = new HFileBlock.Writer(NoOpDataBlockEncoder.INSTANCE, meta, allocator); + hbw.startWriting(BlockType.DATA); + writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1); + hbw.writeHeaderAndData(os); + totalSize = hbw.getOnDiskSizeWithHeader(); + assertTrue( + "expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE, + totalSize < MIN_ALLOCATION_SIZE); + } + return totalSize; + } + + static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException { + Random random = new Random(42); + + byte[] family = new byte[] { 1 }; + int rowKey = 0; + int qualifier = 0; + int value = 0; + long timestamp = 0; + + int totalSize = 0; + + // go until just up to the limit. compression should bring the total on-disk size under + while (totalSize < desiredSize) { + rowKey = maybeIncrement(random, rowKey); + qualifier = maybeIncrement(random, qualifier); + value = maybeIncrement(random, value); + timestamp = maybeIncrement(random, (int) timestamp); + + KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKey), family, Bytes.toBytes(qualifier), + timestamp, Bytes.toBytes(value)); + hbw.write(keyValue); + totalSize += keyValue.getLength(); + } + + return totalSize; + } + + private static int maybeIncrement(Random random, int value) { + if (random.nextFloat() < CHANCE_TO_REPEAT) { + return value; + } + return value + 1; + } + +}