ARTEMIS-1136 Improve UnpooledUnsafeDirectByteBufWrapper

This commit is contained in:
Francesco Nigro 2017-04-30 08:11:41 +02:00 committed by Clebert Suconic
parent 6df8c3a28d
commit 2cdc62572b
1 changed files with 250 additions and 38 deletions

View File

@ -22,6 +22,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.ReadOnlyBufferException;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
@ -29,28 +30,59 @@ import java.nio.channels.ScatteringByteChannel;
import io.netty.util.internal.PlatformDependent;
/**
* A NIO direct {@link ByteBuffer} wrapper.
* Only ByteBuffer's manipulation operations are supported.
* A NIO {@link ByteBuffer}, byte[] and address direct access wrapper.
* Only content manipulation operations are supported.
* Is best suited only for encoding/decoding purposes.
*/
public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf {
private ByteBuffer buffer;
private int arrayOffset;
private byte[] array;
private long memoryAddress;
/**
* Creates a new direct buffer by wrapping the specified initial buffer.
*/
public UnpooledUnsafeDirectByteBufWrapper() {
super(0);
this.buffer = null;
this.arrayOffset = -1;
this.array = null;
this.memoryAddress = 0L;
}
public void wrap(long address, int length) {
this.memoryAddress = address;
this.arrayOffset = -1;
this.array = null;
this.buffer = null;
clear();
maxCapacity(length);
}
public void wrap(byte[] array, int srcIndex, int length) {
if (array != null) {
this.memoryAddress = 0L;
this.arrayOffset = srcIndex;
this.array = array;
this.buffer = null;
clear();
maxCapacity(length);
} else {
reset();
}
}
public void wrap(ByteBuffer buffer, int srcIndex, int length) {
if (buffer != null) {
this.buffer = buffer;
if (buffer.isDirect()) {
this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex;
this.arrayOffset = -1;
this.array = null;
} else {
this.arrayOffset = srcIndex;
this.array = buffer.array();
this.memoryAddress = 0L;
}
clear();
maxCapacity(length);
} else {
@ -61,13 +93,15 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
public void reset() {
this.buffer = null;
this.memoryAddress = 0L;
this.arrayOffset = -1;
this.array = null;
clear();
maxCapacity(0);
}
@Override
public boolean isDirect() {
return true;
return buffer != null ? buffer.isDirect() : false;
}
@Override
@ -95,22 +129,22 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
@Override
public boolean hasArray() {
return false;
return array != null;
}
@Override
public byte[] array() {
throw new UnsupportedOperationException("direct buffer");
return array;
}
@Override
public int arrayOffset() {
throw new UnsupportedOperationException("direct buffer");
return arrayOffset;
}
@Override
public boolean hasMemoryAddress() {
return true;
return memoryAddress != 0;
}
@Override
@ -120,70 +154,156 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
@Override
protected byte _getByte(int index) {
if (hasMemoryAddress()) {
return UnsafeByteBufUtil.getByte(addr(index));
} else {
return UnsafeByteBufUtil.getByte(array, idx(index));
}
}
@Override
protected short _getShort(int index) {
if (hasMemoryAddress()) {
return UnsafeByteBufUtil.getShort(addr(index));
} else {
return UnsafeByteBufUtil.getShort(array, idx(index));
}
}
@Override
protected short _getShortLE(int index) {
if (hasMemoryAddress()) {
return UnsafeByteBufUtil.getShortLE(addr(index));
} else {
return UnsafeByteBufUtil.getShortLE(array, idx(index));
}
}
@Override
protected int _getUnsignedMedium(int index) {
if (hasMemoryAddress()) {
return UnsafeByteBufUtil.getUnsignedMedium(addr(index));
} else {
return UnsafeByteBufUtil.getUnsignedMedium(array, idx(index));
}
}
@Override
protected int _getUnsignedMediumLE(int index) {
if (hasMemoryAddress()) {
return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index));
} else {
return UnsafeByteBufUtil.getUnsignedMediumLE(array, idx(index));
}
}
@Override
protected int _getInt(int index) {
if (hasMemoryAddress()) {
return UnsafeByteBufUtil.getInt(addr(index));
} else {
return UnsafeByteBufUtil.getInt(array, idx(index));
}
}
@Override
protected int _getIntLE(int index) {
if (hasMemoryAddress()) {
return UnsafeByteBufUtil.getIntLE(addr(index));
} else {
return UnsafeByteBufUtil.getIntLE(array, idx(index));
}
}
@Override
protected long _getLong(int index) {
if (hasMemoryAddress()) {
return UnsafeByteBufUtil.getLong(addr(index));
} else {
return UnsafeByteBufUtil.getLong(array, idx(index));
}
}
@Override
protected long _getLongLE(int index) {
if (hasMemoryAddress()) {
return UnsafeByteBufUtil.getLongLE(addr(index));
} else {
return UnsafeByteBufUtil.getLongLE(array, idx(index));
}
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
} else {
final int idx = idx(index);
checkDstIndex(idx, length, dstIndex, dst.capacity());
getBytes(array, idx, dst, dstIndex, length);
}
return this;
}
private static void getBytes(byte[] array, int idx, ByteBuf dst, int dstIndex, int length) {
if (dst.hasMemoryAddress()) {
PlatformDependent.copyMemory(array, idx, dst.memoryAddress() + dstIndex, length);
} else if (dst.hasArray()) {
System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dstIndex, length);
} else {
dst.setBytes(dstIndex, array, idx, length);
}
}
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
} else {
final int idx = idx(index);
checkDstIndex(idx, length, dstIndex, dst.length);
System.arraycopy(array, idx, dst, dstIndex, length);
}
return this;
}
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst);
} else {
final int idx = idx(index);
checkIndex(idx, dst.remaining());
getBytes(array, idx, dst);
}
return this;
}
private static void getBytes(byte[] array, int idx, ByteBuffer dst) {
if (dst.remaining() == 0) {
return;
}
if (dst.isDirect()) {
if (dst.isReadOnly()) {
// We need to check if dst is ready-only so we not write something in it by using Unsafe.
throw new ReadOnlyBufferException();
}
// Copy to direct memory
final long dstAddress = PlatformDependent.directBufferAddress(dst);
PlatformDependent.copyMemory(array, idx, dstAddress + dst.position(), dst.remaining());
dst.position(dst.position() + dst.remaining());
} else if (dst.hasArray()) {
// Copy to array
System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dst.position(), dst.remaining());
dst.position(dst.position() + dst.remaining());
} else {
dst.put(array, idx, dst.remaining());
}
}
@Override
public ByteBuf readBytes(ByteBuffer dst) {
int length = dst.remaining();
final int length = dst.remaining();
checkReadableBytes(length);
getBytes(readerIndex, dst);
readerIndex += length;
@ -192,67 +312,150 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
@Override
protected void _setByte(int index, int value) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setByte(addr(index), value);
} else {
UnsafeByteBufUtil.setByte(array, idx(index), value);
}
}
@Override
protected void _setShort(int index, int value) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setShort(addr(index), value);
} else {
UnsafeByteBufUtil.setShort(array, idx(index), value);
}
}
@Override
protected void _setShortLE(int index, int value) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setShortLE(addr(index), value);
} else {
UnsafeByteBufUtil.setShortLE(array, idx(index), value);
}
}
@Override
protected void _setMedium(int index, int value) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setMedium(addr(index), value);
} else {
UnsafeByteBufUtil.setMedium(array, idx(index), value);
}
}
@Override
protected void _setMediumLE(int index, int value) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setMediumLE(addr(index), value);
} else {
UnsafeByteBufUtil.setMediumLE(array, idx(index), value);
}
}
@Override
protected void _setInt(int index, int value) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setInt(addr(index), value);
} else {
UnsafeByteBufUtil.setInt(array, idx(index), value);
}
}
@Override
protected void _setIntLE(int index, int value) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setIntLE(addr(index), value);
} else {
UnsafeByteBufUtil.setIntLE(array, idx(index), value);
}
}
@Override
protected void _setLong(int index, long value) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setLong(addr(index), value);
} else {
UnsafeByteBufUtil.setLong(array, idx(index), value);
}
}
@Override
protected void _setLongLE(int index, long value) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setLongLE(addr(index), value);
} else {
UnsafeByteBufUtil.setLongLE(array, idx(index), value);
}
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
} else {
final int idx = idx(index);
checkSrcIndex(idx, length, srcIndex, src.capacity());
setBytes(array, idx, src, srcIndex, length);
}
return this;
}
private static void setBytes(byte[] array, int idx, ByteBuf src, int srcIndex, int length) {
if (src.hasMemoryAddress()) {
PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, idx, length);
} else if (src.hasArray()) {
System.arraycopy(src.array(), src.arrayOffset() + srcIndex, array, idx, length);
} else {
src.getBytes(srcIndex, array, idx, length);
}
}
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
} else {
final int idx = idx(index);
checkSrcIndex(idx, length, srcIndex, src.length);
System.arraycopy(src, srcIndex, array, idx, length);
}
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setBytes(this, addr(index), index, src);
} else {
final int idx = idx(index);
checkSrcIndex(idx, src.remaining(), src.position(), src.capacity());
setBytes(array, idx(index), src);
}
return this;
}
private static void setBytes(byte[] array, int idx, ByteBuffer src) {
final int length = src.remaining();
if (length == 0) {
return;
}
if (src.isDirect()) {
// Copy from direct memory
final long srcAddress = PlatformDependent.directBufferAddress(src);
PlatformDependent.copyMemory(srcAddress + src.position(), array, idx, length);
src.position(src.position() + length);
} else if (src.hasArray()) {
// Copy from array
System.arraycopy(src.array(), src.arrayOffset() + src.position(), array, idx, length);
src.position(src.position() + length);
} else {
src.get(array, idx, src.remaining());
}
}
@Override
@Deprecated
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
@ -303,7 +506,7 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
@Override
public int nioBufferCount() {
return 1;
return buffer == null ? 0 : 1;
}
@Override
@ -347,6 +550,10 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
return memoryAddress + index;
}
private int idx(int index) {
return arrayOffset + index;
}
@Override
@Deprecated
protected SwappedByteBuf newSwappedByteBuf() {
@ -355,7 +562,12 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC
@Override
public ByteBuf setZero(int index, int length) {
if (hasMemoryAddress()) {
UnsafeByteBufUtil.setZero(this, addr(index), index, length);
} else {
//prefer Arrays::fill here?
UnsafeByteBufUtil.setZero(array, idx(index), length);
}
return this;
}