diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java index 3ad07622a4f..cb9dc84b94b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java @@ -97,6 +97,8 @@ public class FSDataInputStreamWrapper implements Closeable { private Boolean instanceOfCanUnbuffer = null; private CanUnbuffer unbuffer = null; + protected Path readerPath; + public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { this(fs, path, false, -1L); } @@ -127,6 +129,9 @@ public class FSDataInputStreamWrapper implements Closeable { // Initially we are going to read the tail block. Open the reader w/FS checksum. this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; this.stream = (link != null) ? link.open(hfs) : hfs.open(path); + this.readerPath = this.stream.getWrappedStream() instanceof FileLink.FileLinkInputStream + ? ((FileLink.FileLinkInputStream) this.stream.getWrappedStream()).getCurrentPath() + : path; setStreamOptions(stream); } @@ -342,4 +347,8 @@ public class FSDataInputStreamWrapper implements Closeable { } } } + + public Path getReaderPath() { + return readerPath; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java index 4f140b0774d..c3ccb894d76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java @@ -83,7 +83,7 @@ public class FileLink { * FileLink InputStream that handles the switch between the original path and the alternative * locations, when the file is moved. */ - private static class FileLinkInputStream extends InputStream + protected static class FileLinkInputStream extends InputStream implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead, CanUnbuffer { private FSDataInputStream in = null; private Path currentPath = null; @@ -282,6 +282,10 @@ public class FileLink { public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException { in.setDropBehind(dropCache); } + + public Path getCurrentPath() { + return currentPath; + } } private Path[] locations = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 0eb2aa7db00..1bd3d5d674f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,15 @@ public class HFilePreadReader extends HFileReaderImpl { public void run() { long offset = 0; long end = 0; + HFile.Reader prefetchStreamReader = null; try { + ReaderContext streamReaderContext = ReaderContextBuilder.newBuilder(context) + .withReaderType(ReaderContext.ReaderType.STREAM) + .withInputStreamWrapper(new FSDataInputStreamWrapper(context.getFileSystem(), + context.getInputStreamWrapper().getReaderPath())) + .build(); + prefetchStreamReader = + new HFileStreamReader(streamReaderContext, fileInfo, cacheConf, conf); end = getTrailer().getLoadOnOpenDataOffset(); if (LOG.isTraceEnabled()) { LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end)); @@ -56,8 +65,8 @@ public class HFilePreadReader extends HFileReaderImpl { // the internal-to-hfileblock thread local which holds the overread that gets the // next header, will not have happened...so, pass in the onDiskSize gotten from the // cached block. This 'optimization' triggers extremely rarely I'd say. - HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true, - /* pread= */true, false, false, null, null, true); + HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock, + /* cacheBlock= */true, /* pread= */false, false, false, null, null, true); try { onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize(); offset += block.getOnDiskSizeWithHeader(); @@ -77,6 +86,13 @@ public class HFilePreadReader extends HFileReaderImpl { // Other exceptions are interesting LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e); } finally { + if (prefetchStreamReader != null) { + try { + prefetchStreamReader.close(false); + } catch (IOException e) { + LOG.warn("Close prefetch stream reader failed, path: " + path, e); + } + } PrefetchExecutor.complete(path); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java index 0ec3de58fff..718f7fcb78a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java @@ -43,6 +43,18 @@ public class ReaderContextBuilder { public ReaderContextBuilder() { } + public static ReaderContextBuilder newBuilder(ReaderContext readerContext) { + return new ReaderContextBuilder(readerContext); + } + + private ReaderContextBuilder(ReaderContext readerContext) { + this.filePath = readerContext.getFilePath(); + this.fsdis = readerContext.getInputStreamWrapper(); + this.fileSize = readerContext.getFileSize(); + this.hfs = readerContext.getFileSystem(); + this.type = readerContext.getReaderType(); + } + public ReaderContextBuilder withFilePath(Path filePath) { this.filePath = filePath; return this; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index ee9fd5b7803..f6d18b52892 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -27,6 +27,7 @@ import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiConsumer; import java.util.function.BiFunction; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,15 +35,25 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.regionserver.TestHStoreFile; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -190,6 +201,60 @@ public class TestPrefetch { } + @Test + public void testPrefetchDoesntSkipHFileLink() throws Exception { + testPrefetchWhenHFileLink(c -> { + boolean isCached = c != null; + assertTrue(isCached); + }); + } + + private void testPrefetchWhenHFileLink(Consumer test) throws Exception { + cacheConf = new CacheConfig(conf, blockCache); + HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); + Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink"); + final RegionInfo hri = + RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build(); + // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ + Configuration testConf = new Configuration(this.conf); + CommonFSUtils.setRootDir(testConf, testDir); + HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, + CommonFSUtils.getTableDir(testDir, hri.getTable()), hri); + + // Make a store file and write data to it. + StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs) + .withFilePath(regionFs.createTempName()).withFileContext(context).build(); + TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"), + Bytes.toBytes("testPrefetchWhenHFileLink")); + + Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath()); + Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf")); + HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName()); + Path linkFilePath = + new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName())); + + // Try to open store file from link + StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true); + HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf); + assertTrue(storeFileInfo.isLink()); + + hsf.initReader(); + HFile.Reader reader = hsf.getReader().getHFileReader(); + while (!reader.prefetchComplete()) { + // Sleep for a bit + Thread.sleep(1000); + } + long offset = 0; + while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { + HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true); + BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); + if (block.getBlockType() == BlockType.DATA) { + test.accept(blockCache.getBlock(blockCacheKey, true, false, true)); + } + offset += block.getOnDiskSizeWithHeader(); + } + } + private Path writeStoreFile(String fname) throws IOException { HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); return writeStoreFile(fname, meta); @@ -227,5 +292,4 @@ public class TestPrefetch { return keyType; } } - }