HBASE-27370 Avoid decompressing blocks when reading from bucket cache… (#4781)

Co-authored-by: Josh Elser <elserj@apache.com>
Signed-off-by: Peter Somogyi <psomogyi@apache.org>
Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
This commit is contained in:
Wellington Ramos Chevreuil 2022-09-20 09:15:17 +01:00 committed by Wellington Chevreuil
parent f661976066
commit 3aad87664d
5 changed files with 118 additions and 23 deletions

View File

@ -373,6 +373,10 @@ public final class HFile {
HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread,
final boolean isCompaction, final boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException;
HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread,
final boolean isCompaction, final boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws IOException;
}
/** An interface used by clients to open and iterate an {@link HFile}. */

View File

@ -57,7 +57,7 @@ public class HFilePreadReader extends HFileReaderImpl {
// next header, will not have happened...so, pass in the onDiskSize gotten from the
// cached block. This 'optimization' triggers extremely rarely I'd say.
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
/* pread= */true, false, false, null, null);
/* pread= */true, false, false, null, null, true);
try {
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();

View File

@ -1084,7 +1084,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
* and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary.
*/
private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
// Check cache for block. If found return.
BlockCache cache = cacheConf.getBlockCache().orElse(null);
@ -1189,7 +1189,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory());
HFileBlock cachedBlock =
getCachedBlock(cacheKey, cacheBlock, false, true, true, BlockType.META, null);
getCachedBlock(cacheKey, cacheBlock, false, true, BlockType.META, null);
if (cachedBlock != null) {
assert cachedBlock.isUnpacked() : "Packed block leak.";
// Return a distinct 'shallow copy' of the block,
@ -1236,6 +1236,15 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock,
boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException {
return readBlock(dataBlockOffset, onDiskBlockSize, cacheBlock, pread, isCompaction,
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding, false);
}
@Override
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock,
boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly)
throws IOException {
if (dataBlockIndexReader == null) {
throw new IOException(path + " block index not loaded");
}
@ -1261,17 +1270,18 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
try {
while (true) {
// Check cache for block. If found return.
if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
if (cacheConf.shouldReadBlockFromCache(expectedBlockType) && !cacheOnly) {
if (useLock) {
lockEntry = offsetLock.getLockEntry(dataBlockOffset);
}
// Try and get the block from the block cache. If the useLock variable is true then this
// is the second time through the loop and it should not be counted as a block cache miss.
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics,
expectedBlockType, expectedDataBlockEncoding);
if (cachedBlock != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("From Cache {}", cachedBlock);
LOG.trace("Block for file {} is coming from Cache {}",
Bytes.toString(cachedBlock.getHFileContext().getTableName()), cachedBlock);
}
span.addEvent("block cache hit", attributes);
assert cachedBlock.isUnpacked() : "Packed block leak.";
@ -1308,14 +1318,30 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
!isCompaction, shouldUseHeap(expectedBlockType));
validateBlockType(hfileBlock, expectedBlockType);
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
final boolean cacheCompressed = cacheConf.shouldCacheCompressed(category);
final boolean cacheOnRead = cacheConf.shouldCacheBlockOnRead(category);
// Don't need the unpacked block back and we're storing the block in the cache compressed
if (cacheOnly && cacheCompressed && cacheOnRead) {
LOG.debug("Skipping decompression of block in prefetch");
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
}
});
if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
HFile.DATABLOCK_READ_COUNT.increment();
}
return hfileBlock;
}
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey,
cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory());
}
});

View File

@ -175,6 +175,14 @@ public class TestHFileBlockIndex {
public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread,
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
return readBlock(offset, onDiskSize, cacheBlock, pread, isCompaction, updateCacheMetrics,
expectedBlockType, expectedDataBlockEncoding, false);
}
@Override
public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread,
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws IOException {
if (offset == prevOffset && onDiskSize == prevOnDiskSize && pread == prevPread) {
hitCount += 1;
return prevBlock;

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
@ -26,6 +27,7 @@ import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
@ -34,6 +36,8 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -47,6 +51,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -148,6 +153,51 @@ public class TestPrefetch {
}
private void readStoreFile(Path storeFilePath) throws Exception {
readStoreFile(storeFilePath, (r, o) -> {
HFileBlock block = null;
try {
block = r.readBlock(o, -1, false, true, false, true, null, null);
} catch (IOException e) {
fail(e.getMessage());
}
return block;
}, (key, block) -> {
boolean isCached = blockCache.getBlock(key, true, false, true) != null;
if (
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
assertTrue(isCached);
}
});
}
private void readStoreFileCacheOnly(Path storeFilePath) throws Exception {
readStoreFile(storeFilePath, (r, o) -> {
HFileBlock block = null;
try {
block = r.readBlock(o, -1, false, true, false, true, null, null, true);
} catch (IOException e) {
fail(e.getMessage());
}
return block;
}, (key, block) -> {
boolean isCached = blockCache.getBlock(key, true, false, true) != null;
if (block.getBlockType() == BlockType.DATA) {
assertFalse(block.isUnpacked());
} else if (
block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
assertTrue(block.isUnpacked());
}
assertTrue(isCached);
});
}
private void readStoreFile(Path storeFilePath,
BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
// Open the file
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
@ -155,29 +205,36 @@ public class TestPrefetch {
// Sleep for a bit
Thread.sleep(1000);
}
// Check that all of the data blocks were preloaded
BlockCache blockCache = cacheConf.getBlockCache().get();
long offset = 0;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
HFileBlock block = readFunction.apply(reader, offset);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
if (
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
assertTrue(isCached);
}
validationFunction.accept(blockCacheKey, block);
offset += block.getOnDiskSizeWithHeader();
}
}
@Test
public void testPrefetchCompressed() throws Exception {
conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
cacheConf = new CacheConfig(conf, blockCache);
HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
.withBlockSize(DATA_BLOCK_SIZE).build();
Path storeFile = writeStoreFile("TestPrefetchCompressed", context);
readStoreFileCacheOnly(storeFile);
conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
}
private Path writeStoreFile(String fname) throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
return writeStoreFile(fname, meta);
}
private Path writeStoreFile(String fname, HFileContext context) throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
.withOutputDir(storeFileParentDir).withFileContext(context).build();
Random rand = ThreadLocalRandom.current();
final int rowLen = 32;
for (int i = 0; i < NUM_KV; ++i) {