From 2cdc62572bb42de0f98d96e53fb47dc89012a30c Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Sun, 30 Apr 2017 08:11:41 +0200 Subject: [PATCH] ARTEMIS-1136 Improve UnpooledUnsafeDirectByteBufWrapper --- .../UnpooledUnsafeDirectByteBufWrapper.java | 288 +++++++++++++++--- 1 file changed, 250 insertions(+), 38 deletions(-) diff --git a/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java index 0da33c68ea..a4346dc0c8 100644 --- a/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java +++ b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.ReadOnlyBufferException; import java.nio.channels.FileChannel; import java.nio.channels.GatheringByteChannel; import java.nio.channels.ScatteringByteChannel; @@ -29,28 +30,59 @@ import java.nio.channels.ScatteringByteChannel; import io.netty.util.internal.PlatformDependent; /** - * A NIO direct {@link ByteBuffer} wrapper. - * Only ByteBuffer's manipulation operations are supported. + * A NIO {@link ByteBuffer}, byte[] and address direct access wrapper. + * Only content manipulation operations are supported. * Is best suited only for encoding/decoding purposes. */ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf { private ByteBuffer buffer; + private int arrayOffset; + private byte[] array; private long memoryAddress; - /** - * Creates a new direct buffer by wrapping the specified initial buffer. - */ public UnpooledUnsafeDirectByteBufWrapper() { super(0); this.buffer = null; + this.arrayOffset = -1; + this.array = null; this.memoryAddress = 0L; } + public void wrap(long address, int length) { + this.memoryAddress = address; + this.arrayOffset = -1; + this.array = null; + this.buffer = null; + clear(); + maxCapacity(length); + } + + public void wrap(byte[] array, int srcIndex, int length) { + if (array != null) { + this.memoryAddress = 0L; + this.arrayOffset = srcIndex; + this.array = array; + this.buffer = null; + clear(); + maxCapacity(length); + } else { + reset(); + } + } + public void wrap(ByteBuffer buffer, int srcIndex, int length) { if (buffer != null) { this.buffer = buffer; - this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex; + if (buffer.isDirect()) { + this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex; + this.arrayOffset = -1; + this.array = null; + } else { + this.arrayOffset = srcIndex; + this.array = buffer.array(); + this.memoryAddress = 0L; + } clear(); maxCapacity(length); } else { @@ -61,13 +93,15 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC public void reset() { this.buffer = null; this.memoryAddress = 0L; + this.arrayOffset = -1; + this.array = null; clear(); maxCapacity(0); } @Override public boolean isDirect() { - return true; + return buffer != null ? buffer.isDirect() : false; } @Override @@ -95,22 +129,22 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC @Override public boolean hasArray() { - return false; + return array != null; } @Override public byte[] array() { - throw new UnsupportedOperationException("direct buffer"); + return array; } @Override public int arrayOffset() { - throw new UnsupportedOperationException("direct buffer"); + return arrayOffset; } @Override public boolean hasMemoryAddress() { - return true; + return memoryAddress != 0; } @Override @@ -120,70 +154,156 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC @Override protected byte _getByte(int index) { - return UnsafeByteBufUtil.getByte(addr(index)); + if (hasMemoryAddress()) { + return UnsafeByteBufUtil.getByte(addr(index)); + } else { + return UnsafeByteBufUtil.getByte(array, idx(index)); + } } @Override protected short _getShort(int index) { - return UnsafeByteBufUtil.getShort(addr(index)); + if (hasMemoryAddress()) { + return UnsafeByteBufUtil.getShort(addr(index)); + } else { + return UnsafeByteBufUtil.getShort(array, idx(index)); + } } @Override protected short _getShortLE(int index) { - return UnsafeByteBufUtil.getShortLE(addr(index)); + if (hasMemoryAddress()) { + return UnsafeByteBufUtil.getShortLE(addr(index)); + } else { + return UnsafeByteBufUtil.getShortLE(array, idx(index)); + } } @Override protected int _getUnsignedMedium(int index) { - return UnsafeByteBufUtil.getUnsignedMedium(addr(index)); + if (hasMemoryAddress()) { + return UnsafeByteBufUtil.getUnsignedMedium(addr(index)); + } else { + return UnsafeByteBufUtil.getUnsignedMedium(array, idx(index)); + } } @Override protected int _getUnsignedMediumLE(int index) { - return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index)); + if (hasMemoryAddress()) { + return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index)); + } else { + return UnsafeByteBufUtil.getUnsignedMediumLE(array, idx(index)); + } } @Override protected int _getInt(int index) { - return UnsafeByteBufUtil.getInt(addr(index)); + if (hasMemoryAddress()) { + return UnsafeByteBufUtil.getInt(addr(index)); + } else { + return UnsafeByteBufUtil.getInt(array, idx(index)); + } } @Override protected int _getIntLE(int index) { - return UnsafeByteBufUtil.getIntLE(addr(index)); + if (hasMemoryAddress()) { + return UnsafeByteBufUtil.getIntLE(addr(index)); + } else { + return UnsafeByteBufUtil.getIntLE(array, idx(index)); + } } @Override protected long _getLong(int index) { - return UnsafeByteBufUtil.getLong(addr(index)); + if (hasMemoryAddress()) { + return UnsafeByteBufUtil.getLong(addr(index)); + } else { + return UnsafeByteBufUtil.getLong(array, idx(index)); + } } @Override protected long _getLongLE(int index) { - return UnsafeByteBufUtil.getLongLE(addr(index)); + if (hasMemoryAddress()) { + return UnsafeByteBufUtil.getLongLE(addr(index)); + } else { + return UnsafeByteBufUtil.getLongLE(array, idx(index)); + } } @Override public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { - UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length); + } else { + final int idx = idx(index); + checkDstIndex(idx, length, dstIndex, dst.capacity()); + getBytes(array, idx, dst, dstIndex, length); + } return this; } + private static void getBytes(byte[] array, int idx, ByteBuf dst, int dstIndex, int length) { + if (dst.hasMemoryAddress()) { + PlatformDependent.copyMemory(array, idx, dst.memoryAddress() + dstIndex, length); + } else if (dst.hasArray()) { + System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dstIndex, length); + } else { + dst.setBytes(dstIndex, array, idx, length); + } + } + @Override public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { - UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length); + } else { + final int idx = idx(index); + checkDstIndex(idx, length, dstIndex, dst.length); + System.arraycopy(array, idx, dst, dstIndex, length); + } return this; } @Override public ByteBuf getBytes(int index, ByteBuffer dst) { - UnsafeByteBufUtil.getBytes(this, addr(index), index, dst); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.getBytes(this, addr(index), index, dst); + } else { + final int idx = idx(index); + checkIndex(idx, dst.remaining()); + getBytes(array, idx, dst); + } return this; } + private static void getBytes(byte[] array, int idx, ByteBuffer dst) { + if (dst.remaining() == 0) { + return; + } + if (dst.isDirect()) { + if (dst.isReadOnly()) { + // We need to check if dst is ready-only so we not write something in it by using Unsafe. + throw new ReadOnlyBufferException(); + } + // Copy to direct memory + final long dstAddress = PlatformDependent.directBufferAddress(dst); + PlatformDependent.copyMemory(array, idx, dstAddress + dst.position(), dst.remaining()); + dst.position(dst.position() + dst.remaining()); + } else if (dst.hasArray()) { + // Copy to array + System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dst.position(), dst.remaining()); + dst.position(dst.position() + dst.remaining()); + } else { + dst.put(array, idx, dst.remaining()); + } + } + @Override public ByteBuf readBytes(ByteBuffer dst) { - int length = dst.remaining(); + final int length = dst.remaining(); checkReadableBytes(length); getBytes(readerIndex, dst); readerIndex += length; @@ -192,67 +312,150 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC @Override protected void _setByte(int index, int value) { - UnsafeByteBufUtil.setByte(addr(index), value); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setByte(addr(index), value); + } else { + UnsafeByteBufUtil.setByte(array, idx(index), value); + } } @Override protected void _setShort(int index, int value) { - UnsafeByteBufUtil.setShort(addr(index), value); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setShort(addr(index), value); + } else { + UnsafeByteBufUtil.setShort(array, idx(index), value); + } } @Override protected void _setShortLE(int index, int value) { - UnsafeByteBufUtil.setShortLE(addr(index), value); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setShortLE(addr(index), value); + } else { + UnsafeByteBufUtil.setShortLE(array, idx(index), value); + } } @Override protected void _setMedium(int index, int value) { - UnsafeByteBufUtil.setMedium(addr(index), value); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setMedium(addr(index), value); + } else { + UnsafeByteBufUtil.setMedium(array, idx(index), value); + } } @Override protected void _setMediumLE(int index, int value) { - UnsafeByteBufUtil.setMediumLE(addr(index), value); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setMediumLE(addr(index), value); + } else { + UnsafeByteBufUtil.setMediumLE(array, idx(index), value); + } } @Override protected void _setInt(int index, int value) { - UnsafeByteBufUtil.setInt(addr(index), value); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setInt(addr(index), value); + } else { + UnsafeByteBufUtil.setInt(array, idx(index), value); + } } @Override protected void _setIntLE(int index, int value) { - UnsafeByteBufUtil.setIntLE(addr(index), value); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setIntLE(addr(index), value); + } else { + UnsafeByteBufUtil.setIntLE(array, idx(index), value); + } } @Override protected void _setLong(int index, long value) { - UnsafeByteBufUtil.setLong(addr(index), value); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setLong(addr(index), value); + } else { + UnsafeByteBufUtil.setLong(array, idx(index), value); + } } @Override protected void _setLongLE(int index, long value) { - UnsafeByteBufUtil.setLongLE(addr(index), value); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setLongLE(addr(index), value); + } else { + UnsafeByteBufUtil.setLongLE(array, idx(index), value); + } } @Override public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { - UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length); + } else { + final int idx = idx(index); + checkSrcIndex(idx, length, srcIndex, src.capacity()); + setBytes(array, idx, src, srcIndex, length); + } return this; } + private static void setBytes(byte[] array, int idx, ByteBuf src, int srcIndex, int length) { + if (src.hasMemoryAddress()) { + PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, idx, length); + } else if (src.hasArray()) { + System.arraycopy(src.array(), src.arrayOffset() + srcIndex, array, idx, length); + } else { + src.getBytes(srcIndex, array, idx, length); + } + } + @Override public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { - UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length); + } else { + final int idx = idx(index); + checkSrcIndex(idx, length, srcIndex, src.length); + System.arraycopy(src, srcIndex, array, idx, length); + } return this; } @Override public ByteBuf setBytes(int index, ByteBuffer src) { - UnsafeByteBufUtil.setBytes(this, addr(index), index, src); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setBytes(this, addr(index), index, src); + } else { + final int idx = idx(index); + checkSrcIndex(idx, src.remaining(), src.position(), src.capacity()); + setBytes(array, idx(index), src); + } return this; } + private static void setBytes(byte[] array, int idx, ByteBuffer src) { + final int length = src.remaining(); + if (length == 0) { + return; + } + if (src.isDirect()) { + // Copy from direct memory + final long srcAddress = PlatformDependent.directBufferAddress(src); + PlatformDependent.copyMemory(srcAddress + src.position(), array, idx, length); + src.position(src.position() + length); + } else if (src.hasArray()) { + // Copy from array + System.arraycopy(src.array(), src.arrayOffset() + src.position(), array, idx, length); + src.position(src.position() + length); + } else { + src.get(array, idx, src.remaining()); + } + } + @Override @Deprecated public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { @@ -303,7 +506,7 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC @Override public int nioBufferCount() { - return 1; + return buffer == null ? 0 : 1; } @Override @@ -347,6 +550,10 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC return memoryAddress + index; } + private int idx(int index) { + return arrayOffset + index; + } + @Override @Deprecated protected SwappedByteBuf newSwappedByteBuf() { @@ -355,7 +562,12 @@ public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceC @Override public ByteBuf setZero(int index, int length) { - UnsafeByteBufUtil.setZero(this, addr(index), index, length); + if (hasMemoryAddress()) { + UnsafeByteBufUtil.setZero(this, addr(index), index, length); + } else { + //prefer Arrays::fill here? + UnsafeByteBufUtil.setZero(array, idx(index), length); + } return this; }