Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
parent
81dffda0d3
commit
873f8987f1
|
@ -97,6 +97,8 @@ public class FSDataInputStreamWrapper implements Closeable {
|
||||||
private Boolean instanceOfCanUnbuffer = null;
|
private Boolean instanceOfCanUnbuffer = null;
|
||||||
private CanUnbuffer unbuffer = null;
|
private CanUnbuffer unbuffer = null;
|
||||||
|
|
||||||
|
protected Path readerPath;
|
||||||
|
|
||||||
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
|
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
|
||||||
this(fs, path, false, -1L);
|
this(fs, path, false, -1L);
|
||||||
}
|
}
|
||||||
|
@ -127,6 +129,9 @@ public class FSDataInputStreamWrapper implements Closeable {
|
||||||
// Initially we are going to read the tail block. Open the reader w/FS checksum.
|
// Initially we are going to read the tail block. Open the reader w/FS checksum.
|
||||||
this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
|
this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
|
||||||
this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
|
this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
|
||||||
|
this.readerPath = this.stream.getWrappedStream() instanceof FileLink.FileLinkInputStream
|
||||||
|
? ((FileLink.FileLinkInputStream) this.stream.getWrappedStream()).getCurrentPath()
|
||||||
|
: path;
|
||||||
setStreamOptions(stream);
|
setStreamOptions(stream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -342,4 +347,8 @@ public class FSDataInputStreamWrapper implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Path getReaderPath() {
|
||||||
|
return readerPath;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,7 +83,7 @@ public class FileLink {
|
||||||
* FileLink InputStream that handles the switch between the original path and the alternative
|
* FileLink InputStream that handles the switch between the original path and the alternative
|
||||||
* locations, when the file is moved.
|
* locations, when the file is moved.
|
||||||
*/
|
*/
|
||||||
private static class FileLinkInputStream extends InputStream
|
protected static class FileLinkInputStream extends InputStream
|
||||||
implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead, CanUnbuffer {
|
implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead, CanUnbuffer {
|
||||||
private FSDataInputStream in = null;
|
private FSDataInputStream in = null;
|
||||||
private Path currentPath = null;
|
private Path currentPath = null;
|
||||||
|
@ -286,6 +286,10 @@ public class FileLink {
|
||||||
public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
|
public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
|
||||||
in.setDropBehind(dropCache);
|
in.setDropBehind(dropCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Path getCurrentPath() {
|
||||||
|
return currentPath;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path[] locations = null;
|
private Path[] locations = null;
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -41,7 +42,15 @@ public class HFilePreadReader extends HFileReaderImpl {
|
||||||
public void run() {
|
public void run() {
|
||||||
long offset = 0;
|
long offset = 0;
|
||||||
long end = 0;
|
long end = 0;
|
||||||
|
HFile.Reader prefetchStreamReader = null;
|
||||||
try {
|
try {
|
||||||
|
ReaderContext streamReaderContext = ReaderContextBuilder.newBuilder(context)
|
||||||
|
.withReaderType(ReaderContext.ReaderType.STREAM)
|
||||||
|
.withInputStreamWrapper(new FSDataInputStreamWrapper(context.getFileSystem(),
|
||||||
|
context.getInputStreamWrapper().getReaderPath()))
|
||||||
|
.build();
|
||||||
|
prefetchStreamReader =
|
||||||
|
new HFileStreamReader(streamReaderContext, fileInfo, cacheConf, conf);
|
||||||
end = getTrailer().getLoadOnOpenDataOffset();
|
end = getTrailer().getLoadOnOpenDataOffset();
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
|
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
|
||||||
|
@ -56,8 +65,8 @@ public class HFilePreadReader extends HFileReaderImpl {
|
||||||
// the internal-to-hfileblock thread local which holds the overread that gets the
|
// the internal-to-hfileblock thread local which holds the overread that gets the
|
||||||
// next header, will not have happened...so, pass in the onDiskSize gotten from the
|
// next header, will not have happened...so, pass in the onDiskSize gotten from the
|
||||||
// cached block. This 'optimization' triggers extremely rarely I'd say.
|
// cached block. This 'optimization' triggers extremely rarely I'd say.
|
||||||
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
|
HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock,
|
||||||
/* pread= */true, false, false, null, null, true);
|
/* cacheBlock= */true, /* pread= */false, false, false, null, null, true);
|
||||||
try {
|
try {
|
||||||
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
|
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
|
||||||
offset += block.getOnDiskSizeWithHeader();
|
offset += block.getOnDiskSizeWithHeader();
|
||||||
|
@ -77,6 +86,13 @@ public class HFilePreadReader extends HFileReaderImpl {
|
||||||
// Other exceptions are interesting
|
// Other exceptions are interesting
|
||||||
LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
|
LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
|
||||||
} finally {
|
} finally {
|
||||||
|
if (prefetchStreamReader != null) {
|
||||||
|
try {
|
||||||
|
prefetchStreamReader.close(false);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Close prefetch stream reader failed, path: " + path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
PrefetchExecutor.complete(path);
|
PrefetchExecutor.complete(path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,18 @@ public class ReaderContextBuilder {
|
||||||
public ReaderContextBuilder() {
|
public ReaderContextBuilder() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ReaderContextBuilder newBuilder(ReaderContext readerContext) {
|
||||||
|
return new ReaderContextBuilder(readerContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReaderContextBuilder(ReaderContext readerContext) {
|
||||||
|
this.filePath = readerContext.getFilePath();
|
||||||
|
this.fsdis = readerContext.getInputStreamWrapper();
|
||||||
|
this.fileSize = readerContext.getFileSize();
|
||||||
|
this.hfs = readerContext.getFileSystem();
|
||||||
|
this.type = readerContext.getReaderType();
|
||||||
|
}
|
||||||
|
|
||||||
public ReaderContextBuilder withFilePath(Path filePath) {
|
public ReaderContextBuilder withFilePath(Path filePath) {
|
||||||
this.filePath = filePath;
|
this.filePath = filePath;
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -56,16 +56,20 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
|
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
|
||||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||||
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
|
||||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
@ -252,6 +256,14 @@ public class TestPrefetch {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPrefetchDoesntSkipHFileLink() throws Exception {
|
||||||
|
testPrefetchWhenHFileLink(c -> {
|
||||||
|
boolean isCached = c != null;
|
||||||
|
assertTrue(isCached);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
|
private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
cacheConf = new CacheConfig(conf, blockCache);
|
cacheConf = new CacheConfig(conf, blockCache);
|
||||||
|
@ -287,6 +299,52 @@ public class TestPrefetch {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception {
|
||||||
|
cacheConf = new CacheConfig(conf, blockCache);
|
||||||
|
HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
|
||||||
|
Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink");
|
||||||
|
final RegionInfo hri =
|
||||||
|
RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build();
|
||||||
|
// force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
|
||||||
|
Configuration testConf = new Configuration(this.conf);
|
||||||
|
CommonFSUtils.setRootDir(testConf, testDir);
|
||||||
|
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
|
||||||
|
CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
|
||||||
|
|
||||||
|
// Make a store file and write data to it.
|
||||||
|
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
|
||||||
|
.withFilePath(regionFs.createTempName()).withFileContext(context).build();
|
||||||
|
TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"),
|
||||||
|
Bytes.toBytes("testPrefetchWhenHFileLink"));
|
||||||
|
|
||||||
|
Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath());
|
||||||
|
Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf"));
|
||||||
|
HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
|
||||||
|
Path linkFilePath =
|
||||||
|
new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
|
||||||
|
|
||||||
|
// Try to open store file from link
|
||||||
|
StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true);
|
||||||
|
HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
|
||||||
|
assertTrue(storeFileInfo.isLink());
|
||||||
|
|
||||||
|
hsf.initReader();
|
||||||
|
HFile.Reader reader = hsf.getReader().getHFileReader();
|
||||||
|
while (!reader.prefetchComplete()) {
|
||||||
|
// Sleep for a bit
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
long offset = 0;
|
||||||
|
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
||||||
|
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
|
||||||
|
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
|
||||||
|
if (block.getBlockType() == BlockType.DATA) {
|
||||||
|
test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
|
||||||
|
}
|
||||||
|
offset += block.getOnDiskSizeWithHeader();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private Path writeStoreFile(String fname) throws IOException {
|
private Path writeStoreFile(String fname) throws IOException {
|
||||||
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
|
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
|
||||||
return writeStoreFile(fname, meta);
|
return writeStoreFile(fname, meta);
|
||||||
|
|
Loading…
Reference in New Issue