HBASE-27053 IOException during caching of uncompressed block to the block cache (#4610)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Reviewed-by: wenwj0 <wenweijian2@huawei.com>
This commit is contained in:
Bryan Beaudreault 2022-07-16 17:30:55 -04:00 committed by GitHub
parent 02f26368e2
commit 70a2ee1716
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 165 additions and 67 deletions

View File

@ -493,18 +493,8 @@ public class HFileBlock implements Cacheable {
* @return the buffer with header skipped and checksum omitted. * @return the buffer with header skipped and checksum omitted.
*/ */
public ByteBuff getBufferWithoutHeader() { public ByteBuff getBufferWithoutHeader() {
return this.getBufferWithoutHeader(false);
}
/**
* Returns a buffer that does not include the header or checksum.
* @param withChecksum to indicate whether include the checksum or not.
* @return the buffer with header skipped and checksum omitted.
*/
public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
ByteBuff dup = getBufferReadOnly(); ByteBuff dup = getBufferReadOnly();
int delta = withChecksum ? 0 : totalChecksumBytes(); return dup.position(headerSize()).slice();
return dup.position(headerSize()).limit(buf.limit() - delta).slice();
} }
/** /**
@ -568,19 +558,21 @@ public class HFileBlock implements Cacheable {
sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
} }
int cksumBytes = totalChecksumBytes(); if (dup.limit() != onDiskDataSizeWithHeader) {
int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; throw new AssertionError(
if (dup.limit() != expectedBufLimit) { "Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit());
throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
} }
// We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
// block's header, so there are two sensible values for buffer capacity. // block's header, so there are two sensible values for buffer capacity.
int hdrSize = headerSize(); int hdrSize = headerSize();
dup.rewind(); dup.rewind();
if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) { if (
dup.remaining() != onDiskDataSizeWithHeader
&& dup.remaining() != onDiskDataSizeWithHeader + hdrSize
) {
throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected " throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected "
+ expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); + onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize));
} }
} }
@ -641,12 +633,13 @@ public class HFileBlock implements Cacheable {
? reader.getBlockDecodingContext() ? reader.getBlockDecodingContext()
: reader.getDefaultBlockDecodingContext(); : reader.getDefaultBlockDecodingContext();
// Create a duplicated buffer without the header part. // Create a duplicated buffer without the header part.
int headerSize = this.headerSize();
ByteBuff dup = this.buf.duplicate(); ByteBuff dup = this.buf.duplicate();
dup.position(this.headerSize()); dup.position(headerSize);
dup = dup.slice(); dup = dup.slice();
// Decode the dup into unpacked#buf // Decode the dup into unpacked#buf
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize,
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup); unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup);
succ = true; succ = true;
return unpacked; return unpacked;
} finally { } finally {
@ -661,9 +654,8 @@ public class HFileBlock implements Cacheable {
* buffer. Does not change header fields. Reserve room to keep checksum bytes too. * buffer. Does not change header fields. Reserve room to keep checksum bytes too.
*/ */
private ByteBuff allocateBufferForUnpacking() { private ByteBuff allocateBufferForUnpacking() {
int cksumBytes = totalChecksumBytes();
int headerSize = headerSize(); int headerSize = headerSize();
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; int capacityNeeded = headerSize + uncompressedSizeWithoutHeader;
ByteBuff source = buf.duplicate(); ByteBuff source = buf.duplicate();
ByteBuff newBuf = allocator.allocate(capacityNeeded); ByteBuff newBuf = allocator.allocate(capacityNeeded);
@ -682,9 +674,8 @@ public class HFileBlock implements Cacheable {
* calculated heuristic, not tracked attribute of the block. * calculated heuristic, not tracked attribute of the block.
*/ */
public boolean isUnpacked() { public boolean isUnpacked() {
final int cksumBytes = totalChecksumBytes();
final int headerSize = headerSize(); final int headerSize = headerSize();
final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader;
final int bufCapacity = buf.remaining(); final int bufCapacity = buf.remaining();
return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
} }
@ -1709,6 +1700,9 @@ public class HFileBlock implements Cacheable {
if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
return null; return null;
} }
// remove checksum from buffer now that it's verified
int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
curBlock.limit(sizeWithoutChecksum);
long duration = EnvironmentEdgeManager.currentTime() - startTime; long duration = EnvironmentEdgeManager.currentTime() - startTime;
if (updateMetrics) { if (updateMetrics) {
HFile.updateReadLatency(duration, pread); HFile.updateReadLatency(duration, pread);

View File

@ -149,11 +149,13 @@ public class TestChecksum {
HFileBlock b = hbr.readBlockData(0, -1, false, false, true); HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
assertTrue(!b.isSharedMem()); assertTrue(!b.isSharedMem());
ByteBuff bufferWithChecksum = getBufferWithChecksum(b);
// verify SingleByteBuff checksum. // verify SingleByteBuff checksum.
verifySBBCheckSum(b.getBufferReadOnly()); verifySBBCheckSum(bufferWithChecksum);
// verify MultiByteBuff checksum. // verify MultiByteBuff checksum.
verifyMBBCheckSum(b.getBufferReadOnly()); verifyMBBCheckSum(bufferWithChecksum);
ByteBuff data = b.getBufferWithoutHeader(); ByteBuff data = b.getBufferWithoutHeader();
for (int i = 0; i < intCount; i++) { for (int i = 0; i < intCount; i++) {
@ -169,6 +171,28 @@ public class TestChecksum {
} }
} }
/**
* HFileBlock buffer does not include checksum because it is discarded after verifying upon
* reading from disk. We artificially add a checksum onto the buffer for use in testing that
* ChecksumUtil.validateChecksum works for SingleByteBuff and MultiByteBuff in
* {@link #verifySBBCheckSum(ByteBuff)} and {@link #verifyMBBCheckSum(ByteBuff)}
*/
private ByteBuff getBufferWithChecksum(HFileBlock block) throws IOException {
ByteBuff buf = block.getBufferReadOnly();
int numBytes =
(int) ChecksumUtil.numBytes(buf.remaining(), block.getHFileContext().getBytesPerChecksum());
byte[] checksum = new byte[numBytes];
ChecksumUtil.generateChecksums(buf.array(), 0, buf.limit(), checksum, 0,
block.getHFileContext().getChecksumType(), block.getBytesPerChecksum());
ByteBuff bufWithChecksum = ByteBuffAllocator.HEAP.allocate(buf.limit() + numBytes);
bufWithChecksum.put(buf.array(), 0, buf.limit());
bufWithChecksum.put(checksum);
return bufWithChecksum.rewind();
}
/** /**
* Introduce checksum failures and check that we can still read the data * Introduce checksum failures and check that we can still read the data
*/ */

View File

@ -652,14 +652,13 @@ public class TestHFileBlock {
// verifies that the unpacked value read back off disk matches the unpacked value // verifies that the unpacked value read back off disk matches the unpacked value
// generated before writing to disk. // generated before writing to disk.
HFileBlock newBlock = b.unpack(meta, hbr); HFileBlock newBlock = b.unpack(meta, hbr);
// b's buffer has header + data + checksum while // neither b's unpacked nor the expectedContents have checksum.
// expectedContents have header + data only // they should be identical
ByteBuff bufRead = newBlock.getBufferReadOnly(); ByteBuff bufRead = newBlock.getBufferReadOnly();
ByteBuffer bufExpected = expectedContents.get(i); ByteBuffer bufExpected = expectedContents.get(i);
byte[] tmp = new byte[bufRead.limit() - newBlock.totalChecksumBytes()]; byte[] bytesRead = bufRead.toBytes();
bufRead.get(tmp, 0, tmp.length); boolean bytesAreCorrect = Bytes.compareTo(bytesRead, 0, bytesRead.length,
boolean bytesAreCorrect = Bytes.compareTo(tmp, 0, tmp.length, bufExpected.array(), bufExpected.array(), bufExpected.arrayOffset(), bufExpected.limit()) == 0;
bufExpected.arrayOffset(), bufExpected.limit()) == 0;
String wrongBytesMsg = ""; String wrongBytesMsg = "";
if (!bytesAreCorrect) { if (!bytesAreCorrect) {
@ -669,8 +668,7 @@ public class TestHFileBlock {
+ pread + ", cacheOnWrite=" + cacheOnWrite + "):\n"; + pread + ", cacheOnWrite=" + cacheOnWrite + "):\n";
wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n" bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n"
+ Bytes.toStringBinary(bufRead.array(), bufRead.arrayOffset(), + Bytes.toStringBinary(bytesRead, 0, Math.min(32 + 10, bytesRead.length));
Math.min(32 + 10, bufRead.limit()));
if (detailedLogging) { if (detailedLogging) {
LOG.warn( LOG.warn(
"expected header" + HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) "expected header" + HFileBlock.toStringHeader(new SingleByteBuff(bufExpected))

View File

@ -17,10 +17,13 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -30,11 +33,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -71,7 +76,62 @@ public class TestHFileBlockUnpack {
Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE); conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE);
allocator = ByteBuffAllocator.create(conf, true); allocator = ByteBuffAllocator.create(conf, true);
}
/**
* It's important that if you read and unpack the same HFileBlock twice, it results in an
* identical buffer each time. Otherwise we end up with validation failures in block cache, since
* contents may not match if the same block is cached twice. See
* https://issues.apache.org/jira/browse/HBASE-27053
*/
@Test
public void itUnpacksIdenticallyEachTime() throws IOException {
Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
int totalSize = createTestBlock(path);
// Allocate a bunch of random buffers, so we can be sure that unpack will only have "dirty"
// buffers to choose from when allocating itself.
Random random = new Random();
byte[] temp = new byte[HConstants.DEFAULT_BLOCKSIZE];
List<ByteBuff> buffs = new ArrayList<>();
for (int i = 0; i < 10; i++) {
ByteBuff buff = allocator.allocate(HConstants.DEFAULT_BLOCKSIZE);
random.nextBytes(temp);
buff.put(temp);
buffs.add(buff);
}
buffs.forEach(ByteBuff::release);
// read the same block twice. we should expect the underlying buffer below to
// be identical each time
HFileBlockWrapper blockOne = readBlock(path, totalSize);
HFileBlockWrapper blockTwo = readBlock(path, totalSize);
// first check size fields
assertEquals(blockOne.original.getOnDiskSizeWithHeader(),
blockTwo.original.getOnDiskSizeWithHeader());
assertEquals(blockOne.original.getUncompressedSizeWithoutHeader(),
blockTwo.original.getUncompressedSizeWithoutHeader());
// next check packed buffers
assertBuffersEqual(blockOne.original.getBufferWithoutHeader(),
blockTwo.original.getBufferWithoutHeader(),
blockOne.original.getOnDiskDataSizeWithHeader() - blockOne.original.headerSize());
// now check unpacked buffers. prior to HBASE-27053, this would fail because
// the unpacked buffer would include extra space for checksums at the end that was not written.
// so the checksum space would be filled with random junk when re-using pooled buffers.
assertBuffersEqual(blockOne.unpacked.getBufferWithoutHeader(),
blockTwo.unpacked.getBufferWithoutHeader(),
blockOne.original.getUncompressedSizeWithoutHeader());
}
private void assertBuffersEqual(ByteBuff bufferOne, ByteBuff bufferTwo, int expectedSize) {
assertEquals(expectedSize, bufferOne.limit());
assertEquals(expectedSize, bufferTwo.limit());
assertEquals(0,
ByteBuff.compareTo(bufferOne, 0, bufferOne.limit(), bufferTwo, 0, bufferTwo.limit()));
} }
/** /**
@ -83,15 +143,66 @@ public class TestHFileBlockUnpack {
*/ */
@Test @Test
public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException { public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException {
Configuration conf = TEST_UTIL.getConfiguration(); Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
int totalSize = createTestBlock(path);
HFileBlockWrapper blockFromHFile = readBlock(path, totalSize);
assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.original.isUnpacked());
assertFalse("expected hfile block to NOT use shared memory",
blockFromHFile.original.isSharedMem());
assertTrue(
"expected generated block size " + blockFromHFile.original.getOnDiskSizeWithHeader()
+ " to be less than " + MIN_ALLOCATION_SIZE,
blockFromHFile.original.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE);
assertTrue(
"expected generated block uncompressed size "
+ blockFromHFile.original.getUncompressedSizeWithoutHeader() + " to be more than "
+ MIN_ALLOCATION_SIZE,
blockFromHFile.original.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE);
assertTrue("expected unpacked block to be unpacked", blockFromHFile.unpacked.isUnpacked());
assertTrue("expected unpacked block to use shared memory",
blockFromHFile.unpacked.isSharedMem());
}
private final static class HFileBlockWrapper {
private final HFileBlock original;
private final HFileBlock unpacked;
private HFileBlockWrapper(HFileBlock original, HFileBlock unpacked) {
this.original = original;
this.unpacked = unpacked;
}
}
private HFileBlockWrapper readBlock(Path path, int totalSize) throws IOException {
try (FSDataInputStream is = fs.open(path)) {
HFileContext meta =
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ)
.withIncludesMvcc(false).withIncludesTags(false).build();
ReaderContext context =
new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
.withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
HFileBlock.FSReaderImpl hbr =
new HFileBlock.FSReaderImpl(context, meta, allocator, TEST_UTIL.getConfiguration());
hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, TEST_UTIL.getConfiguration());
hbr.setIncludesMemStoreTS(false);
HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false);
blockFromHFile.sanityCheck();
return new HFileBlockWrapper(blockFromHFile, blockFromHFile.unpack(meta, hbr));
}
}
private int createTestBlock(Path path) throws IOException {
HFileContext meta = HFileContext meta =
new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false) new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false)
.withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); .withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
int totalSize; int totalSize;
try (FSDataOutputStream os = fs.create(path)) { try (FSDataOutputStream os = fs.create(path)) {
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, NoOpDataBlockEncoder.INSTANCE, meta); HFileBlock.Writer hbw =
new HFileBlock.Writer(TEST_UTIL.getConfiguration(), NoOpDataBlockEncoder.INSTANCE, meta);
hbw.startWriting(BlockType.DATA); hbw.startWriting(BlockType.DATA);
writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1); writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1);
hbw.writeHeaderAndData(os); hbw.writeHeaderAndData(os);
@ -100,36 +211,7 @@ public class TestHFileBlockUnpack {
"expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE, "expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE,
totalSize < MIN_ALLOCATION_SIZE); totalSize < MIN_ALLOCATION_SIZE);
} }
return totalSize;
try (FSDataInputStream is = fs.open(path)) {
meta =
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ)
.withIncludesMvcc(false).withIncludesTags(false).build();
ReaderContext context =
new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
.withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, allocator, conf);
hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, conf);
hbr.setIncludesMemStoreTS(false);
HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false);
blockFromHFile.sanityCheck();
assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.isUnpacked());
assertFalse("expected hfile block to NOT use shared memory", blockFromHFile.isSharedMem());
assertTrue(
"expected generated block size " + blockFromHFile.getOnDiskSizeWithHeader()
+ " to be less than " + MIN_ALLOCATION_SIZE,
blockFromHFile.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE);
assertTrue(
"expected generated block uncompressed size "
+ blockFromHFile.getUncompressedSizeWithoutHeader() + " to be more than "
+ MIN_ALLOCATION_SIZE,
blockFromHFile.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE);
HFileBlock blockUnpacked = blockFromHFile.unpack(meta, hbr);
assertTrue("expected unpacked block to be unpacked", blockUnpacked.isUnpacked());
assertTrue("expected unpacked block to use shared memory", blockUnpacked.isSharedMem());
}
} }
static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException { static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException {