HBASE-14020 Unsafe based optimized write in ByteBufferOutputStream.

This commit is contained in:
anoopsjohn 2015-07-06 10:44:38 +05:30
parent 1f9bf419c1
commit 7d3456d8fd
3 changed files with 89 additions and 11 deletions

View File

@ -28,6 +28,7 @@ import java.nio.channels.WritableByteChannel;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
@ -89,7 +90,7 @@ public class ByteBufferOutputStream extends OutputStream {
newSize = Math.max(newSize, buf.position() + extra); newSize = Math.max(newSize, buf.position() + extra);
ByteBuffer newBuf = allocate(newSize, buf.isDirect()); ByteBuffer newBuf = allocate(newSize, buf.isDirect());
buf.flip(); buf.flip();
newBuf.put(buf); ByteBufferUtils.copyFromBufferToBuffer(buf, newBuf);
buf = newBuf; buf = newBuf;
} }
} }
@ -98,7 +99,6 @@ public class ByteBufferOutputStream extends OutputStream {
@Override @Override
public void write(int b) throws IOException { public void write(int b) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_BYTE); checkSizeAndGrow(Bytes.SIZEOF_BYTE);
buf.put((byte)b); buf.put((byte)b);
} }
@ -118,16 +118,13 @@ public class ByteBufferOutputStream extends OutputStream {
@Override @Override
public void write(byte[] b) throws IOException { public void write(byte[] b) throws IOException {
checkSizeAndGrow(b.length); write(b, 0, b.length);
buf.put(b);
} }
@Override @Override
public void write(byte[] b, int off, int len) throws IOException { public void write(byte[] b, int off, int len) throws IOException {
checkSizeAndGrow(len); checkSizeAndGrow(len);
ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len);
buf.put(b, off, len);
} }
/** /**
@ -138,7 +135,7 @@ public class ByteBufferOutputStream extends OutputStream {
*/ */
public void writeInt(int i) throws IOException { public void writeInt(int i) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_INT); checkSizeAndGrow(Bytes.SIZEOF_INT);
this.buf.putInt(i); ByteBufferUtils.putInt(this.buf, i);
} }
@Override @Override

View File

@ -353,6 +353,23 @@ public final class ByteBufferUtils {
} }
} }
/**
* Copy one buffer's whole data to another. Write starts at the current position of 'out' buffer.
* Note : This will advance the position marker of {@code out} but not change the position maker
* for {@code in}. The position and limit of the {@code in} buffer to be set properly by caller.
* @param in source buffer
* @param out destination buffer
*/
public static void copyFromBufferToBuffer(ByteBuffer in, ByteBuffer out) {
if (UnsafeAccess.isAvailable()) {
int length = in.remaining();
UnsafeAccess.copy(in, in.position(), out, out.position(), length);
out.position(out.position() + length);
} else {
out.put(in);
}
}
/** /**
* Copy from one buffer to another from given offset. This will be absolute positional copying and * Copy from one buffer to another from given offset. This will be absolute positional copying and
* won't affect the position of any of the buffers. * won't affect the position of any of the buffers.
@ -632,6 +649,21 @@ public final class ByteBufferUtils {
} }
} }
/**
* Put an int value out to the given ByteBuffer's current position in big-endian format.
* This also advances the position in buffer by int size.
* @param buffer the ByteBuffer to write to
* @param val int to write out
*/
public static void putInt(ByteBuffer buffer, int val) {
if (UnsafeAccess.isAvailable()) {
int newPos = UnsafeAccess.putInt(buffer, buffer.position(), val);
buffer.position(newPos);
} else {
buffer.putInt(val);
}
}
/** /**
* Copies the bytes from given array's offset to length part into the given buffer. Puts the bytes * Copies the bytes from given array's offset to length part into the given buffer. Puts the bytes
* to buffer's current position. This also advances the position in the 'out' buffer by 'length' * to buffer's current position. This also advances the position in the 'out' buffer by 'length'

View File

@ -250,10 +250,28 @@ public final class UnsafeAccess {
return theUnsafe.getLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset); return theUnsafe.getLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
} }
/**
* Put an int value out to the specified ByteBuffer offset in big-endian format.
* @param buf the ByteBuffer to write to
* @param offset offset in the ByteBuffer
* @param val int to write out
* @return incremented offset
*/
public static int putInt(ByteBuffer buf, int offset, int val) {
if (littleEndian) {
val = Integer.reverseBytes(val);
}
if (buf.isDirect()) {
theUnsafe.putInt(((DirectBuffer) buf).address() + offset, val);
} else {
theUnsafe.putInt(buf.array(), offset + buf.arrayOffset() + BYTE_ARRAY_BASE_OFFSET, val);
}
return offset + Bytes.SIZEOF_INT;
}
// APIs to copy data. This will be direct memory location copy and will be much faster // APIs to copy data. This will be direct memory location copy and will be much faster
/** /**
* Copies the bytes from given array's offset to length part into the given buffer. Puts the bytes * Copies the bytes from given array's offset to length part into the given buffer.
* to buffer's current position.
* @param src * @param src
* @param srcOffset * @param srcOffset
* @param dest * @param dest
@ -274,7 +292,9 @@ public final class UnsafeAccess {
} }
/** /**
* Copies specified number of bytes from given offset of 'in' ByteBuffer to the array. * Copies specified number of bytes from given offset of {@code src} ByteBuffer to the
* {@code dest} array.
*
* @param src * @param src
* @param srcOffset * @param srcOffset
* @param dest * @param dest
@ -294,4 +314,33 @@ public final class UnsafeAccess {
long destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET; long destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET;
theUnsafe.copyMemory(srcBase, srcAddress, dest, destAddress, length); theUnsafe.copyMemory(srcBase, srcAddress, dest, destAddress, length);
} }
/**
* Copies specified number of bytes from given offset of {@code src} buffer into the {@code dest}
* buffer.
*
* @param src
* @param srcOffset
* @param dest
* @param destOffset
* @param length
*/
public static void copy(ByteBuffer src, int srcOffset, ByteBuffer dest, int destOffset,
int length) {
long srcAddress, destAddress;
Object srcBase = null, destBase = null;
if (src.isDirect()) {
srcAddress = srcOffset + ((DirectBuffer) src).address();
} else {
srcAddress = srcOffset + src.arrayOffset() + BYTE_ARRAY_BASE_OFFSET;
srcBase = src.array();
}
if (dest.isDirect()) {
destAddress = destOffset + ((DirectBuffer) dest).address();
} else {
destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset();
destBase = dest.array();
}
theUnsafe.copyMemory(srcBase, srcAddress, destBase, destAddress, length);
}
} }