HBASE-27013 Introduce read all bytes when using pread for prefetch (#4414)
- introduce optional flag `hfile.pread.all.bytes.enabled` for pread that must read full bytes with the next block header
This commit is contained in:
parent
ba7ef0216a
commit
0eb1a4e5ed
|
@ -1011,6 +1011,13 @@ public final class HConstants {
|
|||
public static final long HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT =
|
||||
32 * 1024 * 1024L;
|
||||
|
||||
/**
|
||||
* Configuration key for setting pread must read both necessaryLen and extraLen, default is
|
||||
* disabled. This is an optimized flag for reading HFile from blob storage.
|
||||
*/
|
||||
public static final String HFILE_PREAD_ALL_BYTES_ENABLED_KEY = "hfile.pread.all.bytes.enabled";
|
||||
public static final boolean HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT = false;
|
||||
|
||||
/*
|
||||
* Minimum percentage of free heap necessary for a successful cluster startup.
|
||||
*/
|
||||
|
|
|
@ -228,21 +228,43 @@ public final class BlockIOUtils {
|
|||
*/
|
||||
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
|
||||
int necessaryLen, int extraLen) throws IOException {
|
||||
return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from an input stream at least <code>necessaryLen</code> and if possible,
|
||||
* <code>extraLen</code> also if available. Analogous to
|
||||
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
|
||||
* specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
|
||||
* read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
|
||||
* directly, and does not allocate a temporary byte array.
|
||||
* @param buff ByteBuff to read into.
|
||||
* @param dis the input stream to read from
|
||||
* @param position the position within the stream from which to start reading
|
||||
* @param necessaryLen the number of bytes that are absolutely necessary to read
|
||||
* @param extraLen the number of extra bytes that would be nice to read
|
||||
* @param readAllBytes whether we must read the necessaryLen and extraLen
|
||||
* @return true if and only if extraLen is > 0 and reading those extra bytes was successful
|
||||
* @throws IOException if failed to read the necessary bytes
|
||||
*/
|
||||
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
|
||||
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
|
||||
boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer");
|
||||
|
||||
if (preadbytebuffer) {
|
||||
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen);
|
||||
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes);
|
||||
} else {
|
||||
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen);
|
||||
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position,
|
||||
int necessaryLen, int extraLen) throws IOException {
|
||||
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
|
||||
int remain = necessaryLen + extraLen;
|
||||
byte[] buf = new byte[remain];
|
||||
int bytesRead = 0;
|
||||
while (bytesRead < necessaryLen) {
|
||||
int lengthMustRead = readAllBytes ? remain : necessaryLen;
|
||||
while (bytesRead < lengthMustRead) {
|
||||
int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
|
||||
if (ret < 0) {
|
||||
throw new IOException("Premature EOF from inputStream (positional read returned " + ret
|
||||
|
@ -257,11 +279,12 @@ public final class BlockIOUtils {
|
|||
}
|
||||
|
||||
private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,
|
||||
int necessaryLen, int extraLen) throws IOException {
|
||||
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
|
||||
int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
|
||||
ByteBuffer[] buffers = buff.nioByteBuffers();
|
||||
ByteBuffer cur = buffers[idx];
|
||||
while (bytesRead < necessaryLen) {
|
||||
int lengthMustRead = readAllBytes ? remain : necessaryLen;
|
||||
while (bytesRead < lengthMustRead) {
|
||||
int ret;
|
||||
while (!cur.hasRemaining()) {
|
||||
if (++idx >= buffers.length) {
|
||||
|
|
|
@ -1349,6 +1349,8 @@ public class HFileBlock implements Cacheable {
|
|||
|
||||
private final Lock streamLock = new ReentrantLock();
|
||||
|
||||
private final boolean isPreadAllBytes;
|
||||
|
||||
FSReaderImpl(ReaderContext readerContext, HFileContext fileContext, ByteBuffAllocator allocator,
|
||||
Configuration conf) throws IOException {
|
||||
this.fileSize = readerContext.getFileSize();
|
||||
|
@ -1365,6 +1367,7 @@ public class HFileBlock implements Cacheable {
|
|||
this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
|
||||
defaultDecodingCtx = new HFileBlockDefaultDecodingContext(conf, fileContext);
|
||||
encodedBlockDecodingCtx = defaultDecodingCtx;
|
||||
isPreadAllBytes = readerContext.isPreadAllBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1453,7 +1456,9 @@ public class HFileBlock implements Cacheable {
|
|||
} else {
|
||||
// Positional read. Better for random reads; or when the streamLock is already locked.
|
||||
int extraSize = peekIntoNextBlock ? hdrSize : 0;
|
||||
if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
|
||||
if (
|
||||
!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, isPreadAllBytes)
|
||||
) {
|
||||
// did not read the next block header.
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -39,6 +40,7 @@ public class ReaderContext {
|
|||
private final HFileSystem hfs;
|
||||
private final boolean primaryReplicaReader;
|
||||
private final ReaderType type;
|
||||
private final boolean preadAllBytes;
|
||||
|
||||
public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSize,
|
||||
HFileSystem hfs, boolean primaryReplicaReader, ReaderType type) {
|
||||
|
@ -48,6 +50,8 @@ public class ReaderContext {
|
|||
this.hfs = hfs;
|
||||
this.primaryReplicaReader = primaryReplicaReader;
|
||||
this.type = type;
|
||||
this.preadAllBytes = hfs.getConf().getBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY,
|
||||
HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT);
|
||||
}
|
||||
|
||||
public Path getFilePath() {
|
||||
|
@ -73,4 +77,8 @@ public class ReaderContext {
|
|||
public ReaderType getReaderType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public boolean isPreadAllBytes() {
|
||||
return preadAllBytes;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -28,15 +29,23 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Random;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.nio.MultiByteBuff;
|
||||
|
@ -44,11 +53,13 @@ import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
|||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
@Category({ IOTests.class, SmallTests.class })
|
||||
public class TestBlockIOUtils {
|
||||
|
@ -57,11 +68,17 @@ public class TestBlockIOUtils {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestBlockIOUtils.class);
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
|
||||
|
||||
private static final int NUM_TEST_BLOCKS = 2;
|
||||
private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ;
|
||||
|
||||
@Test
|
||||
public void testIsByteBufferReadable() throws IOException {
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
|
@ -92,6 +109,103 @@ public class TestBlockIOUtils {
|
|||
assertArrayEquals(Bytes.toBytes(s), heapBuf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreadWithReadFullBytes() throws IOException {
|
||||
testPreadReadFullBytesInternal(true, EnvironmentEdgeManager.currentTime());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreadWithoutReadFullBytes() throws IOException {
|
||||
testPreadReadFullBytesInternal(false, EnvironmentEdgeManager.currentTime());
|
||||
}
|
||||
|
||||
private void testPreadReadFullBytesInternal(boolean readAllBytes, long randomSeed)
|
||||
throws IOException {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes);
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), testName.getMethodName());
|
||||
// give a fixed seed such we can see failure easily.
|
||||
Random rand = new Random(randomSeed);
|
||||
long totalDataBlockBytes =
|
||||
writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path);
|
||||
readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes);
|
||||
}
|
||||
|
||||
private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo,
|
||||
Path path) throws IOException {
|
||||
FileSystem fs = HFileSystem.get(conf);
|
||||
FSDataOutputStream os = fs.create(path);
|
||||
HFileContext meta =
|
||||
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
|
||||
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta);
|
||||
long totalDataBlockBytes = 0;
|
||||
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
|
||||
int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
|
||||
if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
|
||||
blockTypeOrdinal = BlockType.DATA.ordinal();
|
||||
}
|
||||
BlockType bt = BlockType.values()[blockTypeOrdinal];
|
||||
DataOutputStream dos = hbw.startWriting(bt);
|
||||
int size = rand.nextInt(500);
|
||||
for (int j = 0; j < size; ++j) {
|
||||
dos.writeShort(i + 1);
|
||||
dos.writeInt(j + 1);
|
||||
}
|
||||
|
||||
hbw.writeHeaderAndData(os);
|
||||
totalDataBlockBytes += hbw.getOnDiskSizeWithHeader();
|
||||
}
|
||||
// append a dummy trailer and in a actual HFile it should have more data.
|
||||
FixedFileTrailer trailer = new FixedFileTrailer(3, 3);
|
||||
trailer.setFirstDataBlockOffset(0);
|
||||
trailer.setLastDataBlockOffset(totalDataBlockBytes);
|
||||
trailer.setComparatorClass(meta.getCellComparator().getClass());
|
||||
trailer.setDataIndexCount(NUM_TEST_BLOCKS);
|
||||
trailer.setCompressionCodec(compressAlgo);
|
||||
trailer.serialize(os);
|
||||
// close the stream
|
||||
os.close();
|
||||
return totalDataBlockBytes;
|
||||
}
|
||||
|
||||
private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo,
|
||||
long totalDataBlockBytes) throws IOException {
|
||||
FSDataInputStream is = fs.open(path);
|
||||
HFileContext fileContext =
|
||||
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
|
||||
ReaderContext context =
|
||||
new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
|
||||
.withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes)
|
||||
.withFilePath(path).withFileSystem(fs).build();
|
||||
HFileBlock.FSReader hbr =
|
||||
new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP, fs.getConf());
|
||||
|
||||
long onDiskSizeOfNextBlock = -1;
|
||||
long offset = 0;
|
||||
int numOfReadBlock = 0;
|
||||
// offset and totalBytes shares the same logic in the HFilePreadReader
|
||||
while (offset < totalDataBlockBytes) {
|
||||
HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false);
|
||||
numOfReadBlock++;
|
||||
try {
|
||||
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
|
||||
offset += block.getOnDiskSizeWithHeader();
|
||||
} finally {
|
||||
block.release();
|
||||
}
|
||||
}
|
||||
assertEquals(totalDataBlockBytes, offset);
|
||||
assertEquals(NUM_TEST_BLOCKS, numOfReadBlock);
|
||||
deleteFile(fs, path);
|
||||
}
|
||||
|
||||
private void deleteFile(FileSystem fs, Path path) throws IOException {
|
||||
if (fs.exists(path)) {
|
||||
fs.delete(path, true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadWithExtra() throws IOException {
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
|
|
Loading…
Reference in New Issue