HBASE-27370 Avoid decompressing blocks when reading from bucket cache… (#4781)
Co-authored-by: Josh Elser <elserj@apache.com> Signed-off-by: Peter Somogyi <psomogyi@apache.org> Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
This commit is contained in:
parent
3f5c0a505a
commit
3f4734edae
|
@ -373,6 +373,10 @@ public final class HFile {
|
|||
HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread,
|
||||
final boolean isCompaction, final boolean updateCacheMetrics, BlockType expectedBlockType,
|
||||
DataBlockEncoding expectedDataBlockEncoding) throws IOException;
|
||||
|
||||
HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread,
|
||||
final boolean isCompaction, final boolean updateCacheMetrics, BlockType expectedBlockType,
|
||||
DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws IOException;
|
||||
}
|
||||
|
||||
/** An interface used by clients to open and iterate an {@link HFile}. */
|
||||
|
|
|
@ -57,7 +57,7 @@ public class HFilePreadReader extends HFileReaderImpl {
|
|||
// next header, will not have happened...so, pass in the onDiskSize gotten from the
|
||||
// cached block. This 'optimization' triggers extremely rarely I'd say.
|
||||
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
|
||||
/* pread= */true, false, false, null, null);
|
||||
/* pread= */true, false, false, null, null, true);
|
||||
try {
|
||||
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
|
||||
offset += block.getOnDiskSizeWithHeader();
|
||||
|
|
|
@ -1084,7 +1084,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
* and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary.
|
||||
*/
|
||||
private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
|
||||
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
|
||||
boolean updateCacheMetrics, BlockType expectedBlockType,
|
||||
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
|
||||
// Check cache for block. If found return.
|
||||
BlockCache cache = cacheConf.getBlockCache().orElse(null);
|
||||
|
@ -1189,7 +1189,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
|
||||
cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory());
|
||||
HFileBlock cachedBlock =
|
||||
getCachedBlock(cacheKey, cacheBlock, false, true, true, BlockType.META, null);
|
||||
getCachedBlock(cacheKey, cacheBlock, false, true, BlockType.META, null);
|
||||
if (cachedBlock != null) {
|
||||
assert cachedBlock.isUnpacked() : "Packed block leak.";
|
||||
// Return a distinct 'shallow copy' of the block,
|
||||
|
@ -1236,6 +1236,15 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock,
|
||||
boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
|
||||
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException {
|
||||
return readBlock(dataBlockOffset, onDiskBlockSize, cacheBlock, pread, isCompaction,
|
||||
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock,
|
||||
boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
|
||||
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly)
|
||||
throws IOException {
|
||||
if (dataBlockIndexReader == null) {
|
||||
throw new IOException(path + " block index not loaded");
|
||||
}
|
||||
|
@ -1261,17 +1270,18 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
try {
|
||||
while (true) {
|
||||
// Check cache for block. If found return.
|
||||
if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
|
||||
if (cacheConf.shouldReadBlockFromCache(expectedBlockType) && !cacheOnly) {
|
||||
if (useLock) {
|
||||
lockEntry = offsetLock.getLockEntry(dataBlockOffset);
|
||||
}
|
||||
// Try and get the block from the block cache. If the useLock variable is true then this
|
||||
// is the second time through the loop and it should not be counted as a block cache miss.
|
||||
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
|
||||
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
|
||||
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics,
|
||||
expectedBlockType, expectedDataBlockEncoding);
|
||||
if (cachedBlock != null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("From Cache {}", cachedBlock);
|
||||
LOG.trace("Block for file {} is coming from Cache {}",
|
||||
Bytes.toString(cachedBlock.getHFileContext().getTableName()), cachedBlock);
|
||||
}
|
||||
span.addEvent("block cache hit", attributes);
|
||||
assert cachedBlock.isUnpacked() : "Packed block leak.";
|
||||
|
@ -1308,14 +1318,30 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
|
||||
!isCompaction, shouldUseHeap(expectedBlockType));
|
||||
validateBlockType(hfileBlock, expectedBlockType);
|
||||
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
|
||||
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
|
||||
final boolean cacheCompressed = cacheConf.shouldCacheCompressed(category);
|
||||
final boolean cacheOnRead = cacheConf.shouldCacheBlockOnRead(category);
|
||||
|
||||
// Don't need the unpacked block back and we're storing the block in the cache compressed
|
||||
if (cacheOnly && cacheCompressed && cacheOnRead) {
|
||||
LOG.debug("Skipping decompression of block in prefetch");
|
||||
// Cache the block if necessary
|
||||
cacheConf.getBlockCache().ifPresent(cache -> {
|
||||
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
|
||||
cache.cacheBlock(cacheKey,
|
||||
cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
|
||||
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
|
||||
}
|
||||
});
|
||||
|
||||
if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
|
||||
HFile.DATABLOCK_READ_COUNT.increment();
|
||||
}
|
||||
return hfileBlock;
|
||||
}
|
||||
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
|
||||
// Cache the block if necessary
|
||||
cacheConf.getBlockCache().ifPresent(cache -> {
|
||||
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
|
||||
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
|
||||
cacheConf.isInMemory());
|
||||
}
|
||||
});
|
||||
|
|
|
@ -178,6 +178,14 @@ public class TestHFileBlockIndex {
|
|||
public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread,
|
||||
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
|
||||
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
|
||||
return readBlock(offset, onDiskSize, cacheBlock, pread, isCompaction, updateCacheMetrics,
|
||||
expectedBlockType, expectedDataBlockEncoding, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread,
|
||||
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
|
||||
DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws IOException {
|
||||
if (offset == prevOffset && onDiskSize == prevOnDiskSize && pread == prevPread) {
|
||||
hitCount += 1;
|
||||
return prevBlock;
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.hfile;
|
|||
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
|
||||
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
|
@ -26,6 +27,7 @@ import static org.hamcrest.Matchers.hasItems;
|
|||
import static org.hamcrest.Matchers.not;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
|
@ -34,6 +36,8 @@ import java.util.List;
|
|||
import java.util.Random;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -47,6 +51,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
|||
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -148,6 +153,51 @@ public class TestPrefetch {
|
|||
}
|
||||
|
||||
private void readStoreFile(Path storeFilePath) throws Exception {
|
||||
readStoreFile(storeFilePath, (r, o) -> {
|
||||
HFileBlock block = null;
|
||||
try {
|
||||
block = r.readBlock(o, -1, false, true, false, true, null, null);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
return block;
|
||||
}, (key, block) -> {
|
||||
boolean isCached = blockCache.getBlock(key, true, false, true) != null;
|
||||
if (
|
||||
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|
||||
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
|
||||
) {
|
||||
assertTrue(isCached);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void readStoreFileCacheOnly(Path storeFilePath) throws Exception {
|
||||
readStoreFile(storeFilePath, (r, o) -> {
|
||||
HFileBlock block = null;
|
||||
try {
|
||||
block = r.readBlock(o, -1, false, true, false, true, null, null, true);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
return block;
|
||||
}, (key, block) -> {
|
||||
boolean isCached = blockCache.getBlock(key, true, false, true) != null;
|
||||
if (block.getBlockType() == BlockType.DATA) {
|
||||
assertFalse(block.isUnpacked());
|
||||
} else if (
|
||||
block.getBlockType() == BlockType.ROOT_INDEX
|
||||
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
|
||||
) {
|
||||
assertTrue(block.isUnpacked());
|
||||
}
|
||||
assertTrue(isCached);
|
||||
});
|
||||
}
|
||||
|
||||
private void readStoreFile(Path storeFilePath,
|
||||
BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
|
||||
BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
|
||||
// Open the file
|
||||
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
|
||||
|
||||
|
@ -155,29 +205,36 @@ public class TestPrefetch {
|
|||
// Sleep for a bit
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
// Check that all of the data blocks were preloaded
|
||||
BlockCache blockCache = cacheConf.getBlockCache().get();
|
||||
long offset = 0;
|
||||
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
||||
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
|
||||
HFileBlock block = readFunction.apply(reader, offset);
|
||||
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
|
||||
boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
|
||||
if (
|
||||
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|
||||
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
|
||||
) {
|
||||
assertTrue(isCached);
|
||||
}
|
||||
validationFunction.accept(blockCacheKey, block);
|
||||
offset += block.getOnDiskSizeWithHeader();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrefetchCompressed() throws Exception {
|
||||
conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
|
||||
cacheConf = new CacheConfig(conf, blockCache);
|
||||
HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
|
||||
.withBlockSize(DATA_BLOCK_SIZE).build();
|
||||
Path storeFile = writeStoreFile("TestPrefetchCompressed", context);
|
||||
readStoreFileCacheOnly(storeFile);
|
||||
conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
|
||||
|
||||
}
|
||||
|
||||
private Path writeStoreFile(String fname) throws IOException {
|
||||
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
|
||||
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
|
||||
return writeStoreFile(fname, meta);
|
||||
}
|
||||
|
||||
private Path writeStoreFile(String fname, HFileContext context) throws IOException {
|
||||
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
|
||||
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
|
||||
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
|
||||
.withOutputDir(storeFileParentDir).withFileContext(context).build();
|
||||
Random rand = ThreadLocalRandom.current();
|
||||
final int rowLen = 32;
|
||||
for (int i = 0; i < NUM_KV; ++i) {
|
||||
|
|
Loading…
Reference in New Issue