diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java index a98a47879c6..01eda2dfa8c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.io.util; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import org.apache.hadoop.fs.ByteBufferReadable; @@ -27,15 +29,36 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.io.IOUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @InterfaceAudience.Private public final class BlockIOUtils { + private static final Logger LOG = + LoggerFactory.getLogger(BlockIOUtils.class); + // TODO: remove the reflection when we update to Hadoop 3.3 or above. + private static Method byteBufferPositionedReadMethod; + + static { + initByteBufferPositionReadableMethod(); + } // Disallow instantiation private BlockIOUtils() { } + private static void initByteBufferPositionReadableMethod() { + try { + //long position, ByteBuffer buf + byteBufferPositionedReadMethod = FSDataInputStream.class.getMethod("read", long.class, + ByteBuffer.class); + } catch (NoSuchMethodException e) { + LOG.debug("Unable to find positioned bytebuffer read API of FSDataInputStream. " + + "preadWithExtra() will use a temporary on-heap byte array."); + } + } + public static boolean isByteBufferReadable(FSDataInputStream is) { InputStream cur = is.getWrappedStream(); for (;;) { @@ -197,6 +220,10 @@ public final class BlockIOUtils { * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to * read. + * + * If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer + * directly, and does not allocate a temporary byte array. + * * @param buff ByteBuff to read into. * @param dis the input stream to read from * @param position the position within the stream from which to start reading @@ -207,6 +234,17 @@ public final class BlockIOUtils { */ public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, int necessaryLen, int extraLen) throws IOException { + boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer"); + + if (preadbytebuffer) { + return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen); + } else { + return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen); + } + } + + private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position, + int necessaryLen, int extraLen) throws IOException { int remain = necessaryLen + extraLen; byte[] buf = new byte[remain]; int bytesRead = 0; @@ -220,15 +258,49 @@ public final class BlockIOUtils { bytesRead += ret; remain -= ret; } - // Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we - // will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[]. - // TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[] - // preadWithExtra method for the upper layer, only need to refactor this method if the - // ByteBuffer pread is OK. copyToByteBuff(buf, 0, bytesRead, buff); return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); } + private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position, + int necessaryLen, int extraLen) throws IOException { + int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0; + ByteBuffer[] buffers = buff.nioByteBuffers(); + ByteBuffer cur = buffers[idx]; + while (bytesRead < necessaryLen) { + int ret; + while (!cur.hasRemaining()) { + if (++idx >= buffers.length) { + throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); + } + cur = buffers[idx]; + } + cur.limit(cur.position() + Math.min(remain, cur.remaining())); + try { + ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur); + } catch (IllegalAccessException e) { + throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read " + + bytesRead + " bytes from position " + position, e); + } catch (InvocationTargetException e) { + throw new IOException("Encountered an exception when invoking ByteBuffer positioned read" + + " when trying to read " + bytesRead + " bytes from position " + position, e); + } catch (NullPointerException e) { + throw new IOException("something is null"); + } catch (Exception e) { + throw e; + } + if (ret < 0) { + throw new IOException("Premature EOF from inputStream (positional read returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, successfully read " + bytesRead); + } + bytesRead += ret; + remain -= ret; + } + + return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); + } + private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out) throws IOException { if (offset < 0 || len < 0 || offset + len > buf.length) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java index 538dc488db2..099a7809dee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java @@ -21,12 +21,15 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import org.apache.hadoop.fs.FSDataInputStream; @@ -138,9 +141,11 @@ public class TestBlockIOUtils { ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); + when(in.hasCapability(anyString())).thenReturn(false); boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); assertFalse("Expect false return when no extra bytes requested", ret); verify(in).read(position, buf, bufOffset, totalLen); + verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); } @@ -156,10 +161,12 @@ public class TestBlockIOUtils { FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(5, buf, 5, 5)).thenReturn(5); + when(in.hasCapability(anyString())).thenReturn(false); boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); assertFalse("Expect false return when no extra bytes requested", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(5, buf, 5, 5); + verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); } @@ -174,9 +181,11 @@ public class TestBlockIOUtils { ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); + when(in.hasCapability(anyString())).thenReturn(false); boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); assertTrue("Expect true return when reading extra bytes succeeds", ret); verify(in).read(position, buf, bufOffset, totalLen); + verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); } @@ -191,9 +200,11 @@ public class TestBlockIOUtils { ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen); + when(in.hasCapability(anyString())).thenReturn(false); boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); assertFalse("Expect false return when reading extra bytes fails", ret); verify(in).read(position, buf, bufOffset, totalLen); + verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); } @@ -210,10 +221,12 @@ public class TestBlockIOUtils { FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(5, buf, 5, 10)).thenReturn(10); + when(in.hasCapability(anyString())).thenReturn(false); boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); assertTrue("Expect true return when reading extra bytes succeeds", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(5, buf, 5, 10); + verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); } @@ -229,8 +242,89 @@ public class TestBlockIOUtils { FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1); + when(in.hasCapability(anyString())).thenReturn(false); exception.expect(IOException.class); exception.expectMessage("EOF"); BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); } + + /** + * Determine if ByteBufferPositionedReadable API is available + * . + * @return true if FSDataInputStream implements ByteBufferPositionedReadable API. + */ + private boolean isByteBufferPositionedReadable() { + try { + //long position, ByteBuffer buf + FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class); + } catch (NoSuchMethodException e) { + return false; + } + return true; + } + + public static class MyFSDataInputStream extends FSDataInputStream { + public MyFSDataInputStream(InputStream in) { + super(in); + } + + // This is the ByteBufferPositionReadable API we want to test. + // Because the API is only available in Hadoop 3.3, FSDataInputStream in older Hadoop + // does not implement the interface, and it wouldn't compile trying to mock the method. + // So explicitly declare the method here to make mocking possible. + public int read(long position, ByteBuffer buf) throws IOException { + return 0; + } + } + + @Test + public void testByteBufferPositionedReadable() throws IOException { + assumeTrue("Skip the test because ByteBufferPositionedReadable is not available", + isByteBufferPositionedReadable()); + long position = 0; + int necessaryLen = 10; + int extraLen = 1; + int totalLen = necessaryLen + extraLen; + int firstReadLen = 6; + int secondReadLen = totalLen - firstReadLen; + ByteBuffer buf = ByteBuffer.allocate(totalLen); + ByteBuff bb = new SingleByteBuff(buf); + MyFSDataInputStream in = mock(MyFSDataInputStream.class); + + when(in.read(position, buf)).thenReturn(firstReadLen); + when(in.read(firstReadLen, buf)).thenReturn(secondReadLen); + when(in.hasCapability(anyString())).thenReturn(true); + boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + assertTrue("Expect true return when reading extra bytes succeeds", ret); + verify(in).read(position, buf); + verify(in).read(firstReadLen, buf); + verify(in).hasCapability(anyString()); + verifyNoMoreInteractions(in); + } + + @Test + public void testByteBufferPositionedReadableEOF() throws IOException { + assumeTrue("Skip the test because ByteBufferPositionedReadable is not available", + isByteBufferPositionedReadable()); + long position = 0; + int necessaryLen = 10; + int extraLen = 0; + int totalLen = necessaryLen + extraLen; + int firstReadLen = 9; + ByteBuffer buf = ByteBuffer.allocate(totalLen); + ByteBuff bb = new SingleByteBuff(buf); + MyFSDataInputStream in = mock(MyFSDataInputStream.class); + + when(in.read(position, buf)).thenReturn(firstReadLen); + when(in.read(position, buf)).thenReturn(-1); + when(in.hasCapability(anyString())).thenReturn(true); + exception.expect(IOException.class); + exception.expectMessage("EOF"); + BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + + verify(in).read(position, buf); + verify(in).read(firstReadLen, buf); + verify(in).hasCapability(anyString()); + verifyNoMoreInteractions(in); + } }