From 7d3456d8fd027e252b1da7578e943f146626135d Mon Sep 17 00:00:00 2001 From: anoopsjohn Date: Mon, 6 Jul 2015 10:44:38 +0530 Subject: [PATCH] HBASE-14020 Unsafe based optimized write in ByteBufferOutputStream. --- .../hbase/io/ByteBufferOutputStream.java | 13 ++--- .../hadoop/hbase/util/ByteBufferUtils.java | 32 +++++++++++ .../hadoop/hbase/util/UnsafeAccess.java | 55 ++++++++++++++++++- 3 files changed, 89 insertions(+), 11 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java index 25b5d51b6d6..1a18b2dcd8e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java @@ -28,6 +28,7 @@ import java.nio.channels.WritableByteChannel; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; /** @@ -89,7 +90,7 @@ public class ByteBufferOutputStream extends OutputStream { newSize = Math.max(newSize, buf.position() + extra); ByteBuffer newBuf = allocate(newSize, buf.isDirect()); buf.flip(); - newBuf.put(buf); + ByteBufferUtils.copyFromBufferToBuffer(buf, newBuf); buf = newBuf; } } @@ -98,7 +99,6 @@ public class ByteBufferOutputStream extends OutputStream { @Override public void write(int b) throws IOException { checkSizeAndGrow(Bytes.SIZEOF_BYTE); - buf.put((byte)b); } @@ -118,16 +118,13 @@ public class ByteBufferOutputStream extends OutputStream { @Override public void write(byte[] b) throws IOException { - checkSizeAndGrow(b.length); - - buf.put(b); + write(b, 0, b.length); } @Override public void write(byte[] b, int off, int len) throws IOException { checkSizeAndGrow(len); - - buf.put(b, off, len); + ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len); } /** @@ -138,7 +135,7 @@ public class ByteBufferOutputStream extends OutputStream { */ public void writeInt(int i) throws IOException { checkSizeAndGrow(Bytes.SIZEOF_INT); - this.buf.putInt(i); + ByteBufferUtils.putInt(this.buf, i); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 33e5cc6e265..3dbb9be98ce 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -353,6 +353,23 @@ public final class ByteBufferUtils { } } + /** + * Copy one buffer's whole data to another. Write starts at the current position of 'out' buffer. + * Note : This will advance the position marker of {@code out} but not change the position maker + * for {@code in}. The position and limit of the {@code in} buffer to be set properly by caller. + * @param in source buffer + * @param out destination buffer + */ + public static void copyFromBufferToBuffer(ByteBuffer in, ByteBuffer out) { + if (UnsafeAccess.isAvailable()) { + int length = in.remaining(); + UnsafeAccess.copy(in, in.position(), out, out.position(), length); + out.position(out.position() + length); + } else { + out.put(in); + } + } + /** * Copy from one buffer to another from given offset. This will be absolute positional copying and * won't affect the position of any of the buffers. @@ -632,6 +649,21 @@ public final class ByteBufferUtils { } } + /** + * Put an int value out to the given ByteBuffer's current position in big-endian format. + * This also advances the position in buffer by int size. + * @param buffer the ByteBuffer to write to + * @param val int to write out + */ + public static void putInt(ByteBuffer buffer, int val) { + if (UnsafeAccess.isAvailable()) { + int newPos = UnsafeAccess.putInt(buffer, buffer.position(), val); + buffer.position(newPos); + } else { + buffer.putInt(val); + } + } + /** * Copies the bytes from given array's offset to length part into the given buffer. Puts the bytes * to buffer's current position. This also advances the position in the 'out' buffer by 'length' diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java index deb9a1a7449..ea13dbc8122 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java @@ -250,10 +250,28 @@ public final class UnsafeAccess { return theUnsafe.getLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset); } + /** + * Put an int value out to the specified ByteBuffer offset in big-endian format. + * @param buf the ByteBuffer to write to + * @param offset offset in the ByteBuffer + * @param val int to write out + * @return incremented offset + */ + public static int putInt(ByteBuffer buf, int offset, int val) { + if (littleEndian) { + val = Integer.reverseBytes(val); + } + if (buf.isDirect()) { + theUnsafe.putInt(((DirectBuffer) buf).address() + offset, val); + } else { + theUnsafe.putInt(buf.array(), offset + buf.arrayOffset() + BYTE_ARRAY_BASE_OFFSET, val); + } + return offset + Bytes.SIZEOF_INT; + } + // APIs to copy data. This will be direct memory location copy and will be much faster /** - * Copies the bytes from given array's offset to length part into the given buffer. Puts the bytes - * to buffer's current position. + * Copies the bytes from given array's offset to length part into the given buffer. * @param src * @param srcOffset * @param dest @@ -274,7 +292,9 @@ public final class UnsafeAccess { } /** - * Copies specified number of bytes from given offset of 'in' ByteBuffer to the array. + * Copies specified number of bytes from given offset of {@code src} ByteBuffer to the + * {@code dest} array. + * * @param src * @param srcOffset * @param dest @@ -294,4 +314,33 @@ public final class UnsafeAccess { long destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET; theUnsafe.copyMemory(srcBase, srcAddress, dest, destAddress, length); } + + /** + * Copies specified number of bytes from given offset of {@code src} buffer into the {@code dest} + * buffer. + * + * @param src + * @param srcOffset + * @param dest + * @param destOffset + * @param length + */ + public static void copy(ByteBuffer src, int srcOffset, ByteBuffer dest, int destOffset, + int length) { + long srcAddress, destAddress; + Object srcBase = null, destBase = null; + if (src.isDirect()) { + srcAddress = srcOffset + ((DirectBuffer) src).address(); + } else { + srcAddress = srcOffset + src.arrayOffset() + BYTE_ARRAY_BASE_OFFSET; + srcBase = src.array(); + } + if (dest.isDirect()) { + destAddress = destOffset + ((DirectBuffer) dest).address(); + } else { + destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset(); + destBase = dest.array(); + } + theUnsafe.copyMemory(srcBase, srcAddress, destBase, destAddress, length); + } }