diff --git a/artemis-features/src/main/resources/artemis.xml b/artemis-features/src/main/resources/artemis.xml index 44f1475433..99db0893b9 100644 --- a/artemis-features/src/main/resources/artemis.xml +++ b/artemis-features/src/main/resources/artemis.xml @@ -59,7 +59,7 @@ under the License. - tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576 + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE tcp://0.0.0.0:5672?protocols=AMQP diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/UnpooledUnsafeDirectByteBufWrapper.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/UnpooledUnsafeDirectByteBufWrapper.java deleted file mode 100644 index 1141f35b4d..0000000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/UnpooledUnsafeDirectByteBufWrapper.java +++ /dev/null @@ -1,593 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.io.buffer; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.ReadOnlyBufferException; -import java.nio.channels.FileChannel; -import java.nio.channels.GatheringByteChannel; -import java.nio.channels.ScatteringByteChannel; - -import io.netty.buffer.AbstractReferenceCountedByteBuf; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.SwappedByteBuf; -import io.netty.util.internal.PlatformDependent; - -/** - * A NIO {@link ByteBuffer}, byte[] and address direct access wrapper. - * Only content manipulation operations are supported. - * Is best suited only for encoding/decoding purposes. - */ -public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf { - - private static final byte ZERO = 0; - private ByteBuffer buffer; - private int arrayOffset; - private byte[] array; - private long memoryAddress; - - public UnpooledUnsafeDirectByteBufWrapper() { - super(0); - this.buffer = null; - this.arrayOffset = -1; - this.array = null; - this.memoryAddress = 0L; - } - - public void wrap(long address, int length) { - this.memoryAddress = address; - this.arrayOffset = -1; - this.array = null; - this.buffer = null; - clear(); - maxCapacity(length); - } - - public void wrap(byte[] array, int srcIndex, int length) { - if (array != null) { - this.memoryAddress = 0L; - this.arrayOffset = srcIndex; - this.array = array; - this.buffer = null; - clear(); - maxCapacity(length); - } else { - reset(); - } - } - - public void wrap(ByteBuffer buffer, int srcIndex, int length) { - if (buffer != null) { - this.buffer = buffer; - if (buffer.isDirect()) { - this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex; - this.arrayOffset = -1; - this.array = null; - } else { - this.arrayOffset = srcIndex; - this.array = buffer.array(); - this.memoryAddress = 0L; - } - clear(); - maxCapacity(length); - } else { - reset(); - } - } - - public void reset() { - this.buffer = null; - this.memoryAddress = 0L; - this.arrayOffset = -1; - this.array = null; - clear(); - maxCapacity(0); - } - - @Override - public boolean isDirect() { - return buffer != null ? buffer.isDirect() : false; - } - - @Override - public int capacity() { - return maxCapacity(); - } - - @Override - public ByteBuf capacity(int newCapacity) { - if (newCapacity != maxCapacity()) { - throw new IllegalArgumentException("can't set a capacity different from the max allowed one"); - } - return this; - } - - @Override - public ByteBufAllocator alloc() { - return null; - } - - @Override - public ByteOrder order() { - return ByteOrder.BIG_ENDIAN; - } - - @Override - public boolean hasArray() { - return array != null; - } - - @Override - public byte[] array() { - return array; - } - - @Override - public int arrayOffset() { - return arrayOffset; - } - - @Override - public boolean hasMemoryAddress() { - return memoryAddress != 0; - } - - @Override - public long memoryAddress() { - return memoryAddress; - } - - @Override - protected byte _getByte(int index) { - if (hasMemoryAddress()) { - return UnsafeByteBufUtil.getByte(addr(index)); - } else { - return UnsafeByteBufUtil.getByte(array, idx(index)); - } - } - - @Override - protected short _getShort(int index) { - if (hasMemoryAddress()) { - return UnsafeByteBufUtil.getShort(addr(index)); - } else { - return UnsafeByteBufUtil.getShort(array, idx(index)); - } - } - - @Override - protected short _getShortLE(int index) { - if (hasMemoryAddress()) { - return UnsafeByteBufUtil.getShortLE(addr(index)); - } else { - return UnsafeByteBufUtil.getShortLE(array, idx(index)); - } - } - - @Override - protected int _getUnsignedMedium(int index) { - if (hasMemoryAddress()) { - return UnsafeByteBufUtil.getUnsignedMedium(addr(index)); - } else { - return UnsafeByteBufUtil.getUnsignedMedium(array, idx(index)); - } - } - - @Override - protected int _getUnsignedMediumLE(int index) { - if (hasMemoryAddress()) { - return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index)); - } else { - return UnsafeByteBufUtil.getUnsignedMediumLE(array, idx(index)); - } - } - - @Override - protected int _getInt(int index) { - if (hasMemoryAddress()) { - return UnsafeByteBufUtil.getInt(addr(index)); - } else { - return UnsafeByteBufUtil.getInt(array, idx(index)); - } - } - - @Override - protected int _getIntLE(int index) { - if (hasMemoryAddress()) { - return UnsafeByteBufUtil.getIntLE(addr(index)); - } else { - return UnsafeByteBufUtil.getIntLE(array, idx(index)); - } - } - - @Override - protected long _getLong(int index) { - if (hasMemoryAddress()) { - return UnsafeByteBufUtil.getLong(addr(index)); - } else { - return UnsafeByteBufUtil.getLong(array, idx(index)); - } - } - - @Override - protected long _getLongLE(int index) { - if (hasMemoryAddress()) { - return UnsafeByteBufUtil.getLongLE(addr(index)); - } else { - return UnsafeByteBufUtil.getLongLE(array, idx(index)); - } - } - - @Override - public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length); - } else { - final int idx = idx(index); - checkDstIndex(idx, length, dstIndex, dst.capacity()); - getBytes(array, idx, dst, dstIndex, length); - } - return this; - } - - private static void getBytes(byte[] array, int idx, ByteBuf dst, int dstIndex, int length) { - if (dst.hasMemoryAddress()) { - PlatformDependent.copyMemory(array, idx, dst.memoryAddress() + dstIndex, length); - } else if (dst.hasArray()) { - System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dstIndex, length); - } else { - dst.setBytes(dstIndex, array, idx, length); - } - } - - @Override - public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length); - } else { - final int idx = idx(index); - checkDstIndex(idx, length, dstIndex, dst.length); - System.arraycopy(array, idx, dst, dstIndex, length); - } - return this; - } - - @Override - public ByteBuf getBytes(int index, ByteBuffer dst) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.getBytes(this, addr(index), index, dst); - } else { - final int idx = idx(index); - checkIndex(idx, dst.remaining()); - getBytes(array, idx, dst); - } - return this; - } - - private static void getBytes(byte[] array, int idx, ByteBuffer dst) { - if (dst.remaining() == 0) { - return; - } - if (dst.isDirect()) { - if (dst.isReadOnly()) { - // We need to check if dst is ready-only so we not write something in it by using Unsafe. - throw new ReadOnlyBufferException(); - } - // Copy to direct memory - final long dstAddress = PlatformDependent.directBufferAddress(dst); - PlatformDependent.copyMemory(array, idx, dstAddress + dst.position(), dst.remaining()); - dst.position(dst.position() + dst.remaining()); - } else if (dst.hasArray()) { - // Copy to array - System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dst.position(), dst.remaining()); - dst.position(dst.position() + dst.remaining()); - } else { - dst.put(array, idx, dst.remaining()); - } - } - - @Override - public ByteBuf readBytes(ByteBuffer dst) { - final int length = dst.remaining(); - checkReadableBytes(length); - int rIndex = readerIndex(); - getBytes(rIndex, dst); - readerIndex(rIndex + length); - return this; - } - - @Override - protected void _setByte(int index, int value) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setByte(addr(index), value); - } else { - UnsafeByteBufUtil.setByte(array, idx(index), value); - } - } - - @Override - protected void _setShort(int index, int value) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setShort(addr(index), value); - } else { - UnsafeByteBufUtil.setShort(array, idx(index), value); - } - } - - @Override - protected void _setShortLE(int index, int value) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setShortLE(addr(index), value); - } else { - UnsafeByteBufUtil.setShortLE(array, idx(index), value); - } - } - - @Override - protected void _setMedium(int index, int value) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setMedium(addr(index), value); - } else { - UnsafeByteBufUtil.setMedium(array, idx(index), value); - } - } - - @Override - protected void _setMediumLE(int index, int value) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setMediumLE(addr(index), value); - } else { - UnsafeByteBufUtil.setMediumLE(array, idx(index), value); - } - } - - @Override - protected void _setInt(int index, int value) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setInt(addr(index), value); - } else { - UnsafeByteBufUtil.setInt(array, idx(index), value); - } - } - - @Override - protected void _setIntLE(int index, int value) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setIntLE(addr(index), value); - } else { - UnsafeByteBufUtil.setIntLE(array, idx(index), value); - } - } - - @Override - protected void _setLong(int index, long value) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setLong(addr(index), value); - } else { - UnsafeByteBufUtil.setLong(array, idx(index), value); - } - } - - @Override - protected void _setLongLE(int index, long value) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setLongLE(addr(index), value); - } else { - UnsafeByteBufUtil.setLongLE(array, idx(index), value); - } - } - - @Override - public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length); - } else { - final int idx = idx(index); - checkSrcIndex(idx, length, srcIndex, src.capacity()); - setBytes(array, idx, src, srcIndex, length); - } - return this; - } - - private static void setBytes(byte[] array, int idx, ByteBuf src, int srcIndex, int length) { - if (src.hasMemoryAddress()) { - PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, idx, length); - } else if (src.hasArray()) { - System.arraycopy(src.array(), src.arrayOffset() + srcIndex, array, idx, length); - } else { - src.getBytes(srcIndex, array, idx, length); - } - } - - @Override - public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length); - } else { - final int idx = idx(index); - checkSrcIndex(idx, length, srcIndex, src.length); - System.arraycopy(src, srcIndex, array, idx, length); - } - return this; - } - - @Override - public ByteBuf setBytes(int index, ByteBuffer src) { - if (hasMemoryAddress()) { - UnsafeByteBufUtil.setBytes(this, addr(index), index, src); - } else { - final int idx = idx(index); - checkSrcIndex(idx, src.remaining(), src.position(), src.capacity()); - setBytes(array, idx(index), src); - } - return this; - } - - private static void setBytes(byte[] array, int idx, ByteBuffer src) { - final int length = src.remaining(); - if (length == 0) { - return; - } - if (src.isDirect()) { - // Copy from direct memory - final long srcAddress = PlatformDependent.directBufferAddress(src); - PlatformDependent.copyMemory(srcAddress + src.position(), array, idx, length); - src.position(src.position() + length); - } else if (src.hasArray()) { - // Copy from array - System.arraycopy(src.array(), src.arrayOffset() + src.position(), array, idx, length); - src.position(src.position() + length); - } else { - src.get(array, idx, src.remaining()); - } - } - - @Override - @Deprecated - public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { - throw new UnsupportedOperationException("unsupported!"); - } - - @Override - @Deprecated - public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { - throw new UnsupportedOperationException("unsupported!"); - } - - @Override - @Deprecated - public int getBytes(int index, FileChannel out, long position, int length) throws IOException { - throw new UnsupportedOperationException("unsupported!"); - } - - @Override - @Deprecated - public int readBytes(GatheringByteChannel out, int length) throws IOException { - throw new UnsupportedOperationException("unsupported!"); - } - - @Override - @Deprecated - public int readBytes(FileChannel out, long position, int length) throws IOException { - throw new UnsupportedOperationException("unsupported!"); - } - - @Override - @Deprecated - public int setBytes(int index, InputStream in, int length) throws IOException { - throw new UnsupportedOperationException("unsupported!"); - } - - @Override - @Deprecated - public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { - throw new UnsupportedOperationException("unsupported!"); - } - - @Override - @Deprecated - public int setBytes(int index, FileChannel in, long position, int length) throws IOException { - throw new UnsupportedOperationException("unsupported!"); - } - - @Override - public int nioBufferCount() { - return buffer == null ? 0 : 1; - } - - @Override - @Deprecated - public ByteBuffer[] nioBuffers(int index, int length) { - throw new UnsupportedOperationException("unsupported!"); - } - - @Override - @Deprecated - public ByteBuf copy(int index, int length) { - - throw new UnsupportedOperationException("unsupported!"); - - } - - @Override - @Deprecated - public ByteBuffer internalNioBuffer(int index, int length) { - throw new UnsupportedOperationException("cannot access directly the wrapped buffer!"); - } - - @Override - @Deprecated - public ByteBuffer nioBuffer(int index, int length) { - throw new UnsupportedOperationException("unsupported!"); - } - - @Override - @Deprecated - protected void deallocate() { - //NO_OP - } - - @Override - public ByteBuf unwrap() { - return null; - } - - private long addr(int index) { - return memoryAddress + index; - } - - private int idx(int index) { - return arrayOffset + index; - } - - @Override - @Deprecated - protected SwappedByteBuf newSwappedByteBuf() { - throw new UnsupportedOperationException("unsupported!"); - } - - @Override - public ByteBuf setZero(int index, int length) { - if (hasMemoryAddress()) { - if (length == 0) { - return this; - } - this.checkIndex(index, length); - PlatformDependent.setMemory(addr(index), length, ZERO); - } else { - //prefer Arrays::fill here? - UnsafeByteBufUtil.setZero(array, idx(index), length); - } - return this; - } - - @Override - public ByteBuf writeZero(int length) { - ensureWritable(length); - int wIndex = writerIndex(); - setZero(wIndex, length); - writerIndex(wIndex + length); - return this; - } -} - diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/UnsafeByteBufUtil.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/UnsafeByteBufUtil.java deleted file mode 100644 index 9b938a0947..0000000000 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/UnsafeByteBufUtil.java +++ /dev/null @@ -1,565 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/* Copyright 2015 The Netty Project */ -package org.apache.activemq.artemis.core.io.buffer; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.ReadOnlyBufferException; - -import io.netty.buffer.AbstractByteBuf; -import io.netty.buffer.ByteBuf; -import io.netty.util.IllegalReferenceCountException; -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.SystemPropertyUtil; - -import static io.netty.util.internal.MathUtil.isOutOfBounds; -import static io.netty.util.internal.ObjectUtil.checkNotNull; -import static io.netty.util.internal.PlatformDependent.BIG_ENDIAN_NATIVE_ORDER; - -/** - * All operations get and set as {@link ByteOrder#BIG_ENDIAN}. - */ -final class UnsafeByteBufUtil { - - private static final boolean UNALIGNED = PlatformDependent.isUnaligned(); - - private static final byte ZERO = 0; - - static byte getByte(long address) { - return PlatformDependent.getByte(address); - } - - static short getShort(long address) { - if (UNALIGNED) { - short v = PlatformDependent.getShort(address); - return BIG_ENDIAN_NATIVE_ORDER ? v : Short.reverseBytes(v); - } - return (short) (PlatformDependent.getByte(address) << 8 | PlatformDependent.getByte(address + 1) & 0xff); - } - - static short getShortLE(long address) { - if (UNALIGNED) { - short v = PlatformDependent.getShort(address); - return BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(v) : v; - } - return (short) (PlatformDependent.getByte(address) & 0xff | PlatformDependent.getByte(address + 1) << 8); - } - - static int getUnsignedMedium(long address) { - if (UNALIGNED) { - return (PlatformDependent.getByte(address) & 0xff) << 16 | (BIG_ENDIAN_NATIVE_ORDER ? PlatformDependent.getShort(address + 1) : Short.reverseBytes(PlatformDependent.getShort(address + 1))) & 0xffff; - } - return (PlatformDependent.getByte(address) & 0xff) << 16 | (PlatformDependent.getByte(address + 1) & 0xff) << 8 | PlatformDependent.getByte(address + 2) & 0xff; - } - - static int getUnsignedMediumLE(long address) { - if (UNALIGNED) { - return (PlatformDependent.getByte(address) & 0xff) | ((BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(PlatformDependent.getShort(address + 1)) : PlatformDependent.getShort(address + 1)) & 0xffff) << 8; - } - return PlatformDependent.getByte(address) & 0xff | (PlatformDependent.getByte(address + 1) & 0xff) << 8 | (PlatformDependent.getByte(address + 2) & 0xff) << 16; - } - - static int getInt(long address) { - if (UNALIGNED) { - int v = PlatformDependent.getInt(address); - return BIG_ENDIAN_NATIVE_ORDER ? v : Integer.reverseBytes(v); - } - return PlatformDependent.getByte(address) << 24 | (PlatformDependent.getByte(address + 1) & 0xff) << 16 | (PlatformDependent.getByte(address + 2) & 0xff) << 8 | PlatformDependent.getByte(address + 3) & 0xff; - } - - static int getIntLE(long address) { - if (UNALIGNED) { - int v = PlatformDependent.getInt(address); - return BIG_ENDIAN_NATIVE_ORDER ? Integer.reverseBytes(v) : v; - } - return PlatformDependent.getByte(address) & 0xff | (PlatformDependent.getByte(address + 1) & 0xff) << 8 | (PlatformDependent.getByte(address + 2) & 0xff) << 16 | PlatformDependent.getByte(address + 3) << 24; - } - - static long getLong(long address) { - if (UNALIGNED) { - long v = PlatformDependent.getLong(address); - return BIG_ENDIAN_NATIVE_ORDER ? v : Long.reverseBytes(v); - } - return ((long) PlatformDependent.getByte(address)) << 56 | (PlatformDependent.getByte(address + 1) & 0xffL) << 48 | (PlatformDependent.getByte(address + 2) & 0xffL) << 40 | (PlatformDependent.getByte(address + 3) & 0xffL) << 32 | (PlatformDependent.getByte(address + 4) & 0xffL) << 24 | (PlatformDependent.getByte(address + 5) & 0xffL) << 16 | (PlatformDependent.getByte(address + 6) & 0xffL) << 8 | (PlatformDependent.getByte(address + 7)) & 0xffL; - } - - static long getLongLE(long address) { - if (UNALIGNED) { - long v = PlatformDependent.getLong(address); - return BIG_ENDIAN_NATIVE_ORDER ? Long.reverseBytes(v) : v; - } - return (PlatformDependent.getByte(address)) & 0xffL | (PlatformDependent.getByte(address + 1) & 0xffL) << 8 | (PlatformDependent.getByte(address + 2) & 0xffL) << 16 | (PlatformDependent.getByte(address + 3) & 0xffL) << 24 | (PlatformDependent.getByte(address + 4) & 0xffL) << 32 | (PlatformDependent.getByte(address + 5) & 0xffL) << 40 | (PlatformDependent.getByte(address + 6) & 0xffL) << 48 | ((long) PlatformDependent.getByte(address + 7)) << 56; - } - - static void setByte(long address, int value) { - PlatformDependent.putByte(address, (byte) value); - } - - static void setShort(long address, int value) { - if (UNALIGNED) { - PlatformDependent.putShort(address, BIG_ENDIAN_NATIVE_ORDER ? (short) value : Short.reverseBytes((short) value)); - } else { - PlatformDependent.putByte(address, (byte) (value >>> 8)); - PlatformDependent.putByte(address + 1, (byte) value); - } - } - - static void setShortLE(long address, int value) { - if (UNALIGNED) { - PlatformDependent.putShort(address, BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes((short) value) : (short) value); - } else { - PlatformDependent.putByte(address, (byte) value); - PlatformDependent.putByte(address + 1, (byte) (value >>> 8)); - } - } - - static void setMedium(long address, int value) { - PlatformDependent.putByte(address, (byte) (value >>> 16)); - if (UNALIGNED) { - PlatformDependent.putShort(address + 1, BIG_ENDIAN_NATIVE_ORDER ? (short) value : Short.reverseBytes((short) value)); - } else { - PlatformDependent.putByte(address + 1, (byte) (value >>> 8)); - PlatformDependent.putByte(address + 2, (byte) value); - } - } - - static void setMediumLE(long address, int value) { - PlatformDependent.putByte(address, (byte) value); - if (UNALIGNED) { - PlatformDependent.putShort(address + 1, BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes((short) (value >>> 8)) : (short) (value >>> 8)); - } else { - PlatformDependent.putByte(address + 1, (byte) (value >>> 8)); - PlatformDependent.putByte(address + 2, (byte) (value >>> 16)); - } - } - - static void setInt(long address, int value) { - if (UNALIGNED) { - PlatformDependent.putInt(address, BIG_ENDIAN_NATIVE_ORDER ? value : Integer.reverseBytes(value)); - } else { - PlatformDependent.putByte(address, (byte) (value >>> 24)); - PlatformDependent.putByte(address + 1, (byte) (value >>> 16)); - PlatformDependent.putByte(address + 2, (byte) (value >>> 8)); - PlatformDependent.putByte(address + 3, (byte) value); - } - } - - static void setIntLE(long address, int value) { - if (UNALIGNED) { - PlatformDependent.putInt(address, BIG_ENDIAN_NATIVE_ORDER ? Integer.reverseBytes(value) : value); - } else { - PlatformDependent.putByte(address, (byte) value); - PlatformDependent.putByte(address + 1, (byte) (value >>> 8)); - PlatformDependent.putByte(address + 2, (byte) (value >>> 16)); - PlatformDependent.putByte(address + 3, (byte) (value >>> 24)); - } - } - - static void setLong(long address, long value) { - if (UNALIGNED) { - PlatformDependent.putLong(address, BIG_ENDIAN_NATIVE_ORDER ? value : Long.reverseBytes(value)); - } else { - PlatformDependent.putByte(address, (byte) (value >>> 56)); - PlatformDependent.putByte(address + 1, (byte) (value >>> 48)); - PlatformDependent.putByte(address + 2, (byte) (value >>> 40)); - PlatformDependent.putByte(address + 3, (byte) (value >>> 32)); - PlatformDependent.putByte(address + 4, (byte) (value >>> 24)); - PlatformDependent.putByte(address + 5, (byte) (value >>> 16)); - PlatformDependent.putByte(address + 6, (byte) (value >>> 8)); - PlatformDependent.putByte(address + 7, (byte) value); - } - } - - static void setLongLE(long address, long value) { - if (UNALIGNED) { - PlatformDependent.putLong(address, BIG_ENDIAN_NATIVE_ORDER ? Long.reverseBytes(value) : value); - } else { - PlatformDependent.putByte(address, (byte) value); - PlatformDependent.putByte(address + 1, (byte) (value >>> 8)); - PlatformDependent.putByte(address + 2, (byte) (value >>> 16)); - PlatformDependent.putByte(address + 3, (byte) (value >>> 24)); - PlatformDependent.putByte(address + 4, (byte) (value >>> 32)); - PlatformDependent.putByte(address + 5, (byte) (value >>> 40)); - PlatformDependent.putByte(address + 6, (byte) (value >>> 48)); - PlatformDependent.putByte(address + 7, (byte) (value >>> 56)); - } - } - - static byte getByte(byte[] array, int index) { - return PlatformDependent.getByte(array, index); - } - - static short getShort(byte[] array, int index) { - if (UNALIGNED) { - short v = PlatformDependent.getShort(array, index); - return BIG_ENDIAN_NATIVE_ORDER ? v : Short.reverseBytes(v); - } - return (short) (PlatformDependent.getByte(array, index) << 8 | PlatformDependent.getByte(array, index + 1) & 0xff); - } - - static short getShortLE(byte[] array, int index) { - if (UNALIGNED) { - short v = PlatformDependent.getShort(array, index); - return BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(v) : v; - } - return (short) (PlatformDependent.getByte(array, index) & 0xff | PlatformDependent.getByte(array, index + 1) << 8); - } - - static int getUnsignedMedium(byte[] array, int index) { - if (UNALIGNED) { - return (PlatformDependent.getByte(array, index) & 0xff) << 16 | (BIG_ENDIAN_NATIVE_ORDER ? PlatformDependent.getShort(array, index + 1) : Short.reverseBytes(PlatformDependent.getShort(array, index + 1))) & 0xffff; - } - return (PlatformDependent.getByte(array, index) & 0xff) << 16 | (PlatformDependent.getByte(array, index + 1) & 0xff) << 8 | PlatformDependent.getByte(array, index + 2) & 0xff; - } - - static int getUnsignedMediumLE(byte[] array, int index) { - if (UNALIGNED) { - return (PlatformDependent.getByte(array, index) & 0xff) | ((BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(PlatformDependent.getShort(array, index + 1)) : PlatformDependent.getShort(array, index + 1)) & 0xffff) << 8; - } - return PlatformDependent.getByte(array, index) & 0xff | (PlatformDependent.getByte(array, index + 1) & 0xff) << 8 | (PlatformDependent.getByte(array, index + 2) & 0xff) << 16; - } - - static int getInt(byte[] array, int index) { - if (UNALIGNED) { - int v = PlatformDependent.getInt(array, index); - return BIG_ENDIAN_NATIVE_ORDER ? v : Integer.reverseBytes(v); - } - return PlatformDependent.getByte(array, index) << 24 | (PlatformDependent.getByte(array, index + 1) & 0xff) << 16 | (PlatformDependent.getByte(array, index + 2) & 0xff) << 8 | PlatformDependent.getByte(array, index + 3) & 0xff; - } - - static int getIntLE(byte[] array, int index) { - if (UNALIGNED) { - int v = PlatformDependent.getInt(array, index); - return BIG_ENDIAN_NATIVE_ORDER ? Integer.reverseBytes(v) : v; - } - return PlatformDependent.getByte(array, index) & 0xff | (PlatformDependent.getByte(array, index + 1) & 0xff) << 8 | (PlatformDependent.getByte(array, index + 2) & 0xff) << 16 | PlatformDependent.getByte(array, index + 3) << 24; - } - - static long getLong(byte[] array, int index) { - if (UNALIGNED) { - long v = PlatformDependent.getLong(array, index); - return BIG_ENDIAN_NATIVE_ORDER ? v : Long.reverseBytes(v); - } - return ((long) PlatformDependent.getByte(array, index)) << 56 | (PlatformDependent.getByte(array, index + 1) & 0xffL) << 48 | (PlatformDependent.getByte(array, index + 2) & 0xffL) << 40 | (PlatformDependent.getByte(array, index + 3) & 0xffL) << 32 | (PlatformDependent.getByte(array, index + 4) & 0xffL) << 24 | (PlatformDependent.getByte(array, index + 5) & 0xffL) << 16 | (PlatformDependent.getByte(array, index + 6) & 0xffL) << 8 | (PlatformDependent.getByte(array, index + 7)) & 0xffL; - } - - static long getLongLE(byte[] array, int index) { - if (UNALIGNED) { - long v = PlatformDependent.getLong(array, index); - return BIG_ENDIAN_NATIVE_ORDER ? Long.reverseBytes(v) : v; - } - return PlatformDependent.getByte(array, index) & 0xffL | (PlatformDependent.getByte(array, index + 1) & 0xffL) << 8 | (PlatformDependent.getByte(array, index + 2) & 0xffL) << 16 | (PlatformDependent.getByte(array, index + 3) & 0xffL) << 24 | (PlatformDependent.getByte(array, index + 4) & 0xffL) << 32 | (PlatformDependent.getByte(array, index + 5) & 0xffL) << 40 | (PlatformDependent.getByte(array, index + 6) & 0xffL) << 48 | ((long) PlatformDependent.getByte(array, index + 7)) << 56; - } - - static void setByte(byte[] array, int index, int value) { - PlatformDependent.putByte(array, index, (byte) value); - } - - static void setShort(byte[] array, int index, int value) { - if (UNALIGNED) { - PlatformDependent.putShort(array, index, BIG_ENDIAN_NATIVE_ORDER ? (short) value : Short.reverseBytes((short) value)); - } else { - PlatformDependent.putByte(array, index, (byte) (value >>> 8)); - PlatformDependent.putByte(array, index + 1, (byte) value); - } - } - - static void setShortLE(byte[] array, int index, int value) { - if (UNALIGNED) { - PlatformDependent.putShort(array, index, BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes((short) value) : (short) value); - } else { - PlatformDependent.putByte(array, index, (byte) value); - PlatformDependent.putByte(array, index + 1, (byte) (value >>> 8)); - } - } - - static void setMedium(byte[] array, int index, int value) { - PlatformDependent.putByte(array, index, (byte) (value >>> 16)); - if (UNALIGNED) { - PlatformDependent.putShort(array, index + 1, BIG_ENDIAN_NATIVE_ORDER ? (short) value : Short.reverseBytes((short) value)); - } else { - PlatformDependent.putByte(array, index + 1, (byte) (value >>> 8)); - PlatformDependent.putByte(array, index + 2, (byte) value); - } - } - - static void setMediumLE(byte[] array, int index, int value) { - PlatformDependent.putByte(array, index, (byte) value); - if (UNALIGNED) { - PlatformDependent.putShort(array, index + 1, BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes((short) (value >>> 8)) : (short) (value >>> 8)); - } else { - PlatformDependent.putByte(array, index + 1, (byte) (value >>> 8)); - PlatformDependent.putByte(array, index + 2, (byte) (value >>> 16)); - } - } - - static void setInt(byte[] array, int index, int value) { - if (UNALIGNED) { - PlatformDependent.putInt(array, index, BIG_ENDIAN_NATIVE_ORDER ? value : Integer.reverseBytes(value)); - } else { - PlatformDependent.putByte(array, index, (byte) (value >>> 24)); - PlatformDependent.putByte(array, index + 1, (byte) (value >>> 16)); - PlatformDependent.putByte(array, index + 2, (byte) (value >>> 8)); - PlatformDependent.putByte(array, index + 3, (byte) value); - } - } - - static void setIntLE(byte[] array, int index, int value) { - if (UNALIGNED) { - PlatformDependent.putInt(array, index, BIG_ENDIAN_NATIVE_ORDER ? Integer.reverseBytes(value) : value); - } else { - PlatformDependent.putByte(array, index, (byte) value); - PlatformDependent.putByte(array, index + 1, (byte) (value >>> 8)); - PlatformDependent.putByte(array, index + 2, (byte) (value >>> 16)); - PlatformDependent.putByte(array, index + 3, (byte) (value >>> 24)); - } - } - - static void setLong(byte[] array, int index, long value) { - if (UNALIGNED) { - PlatformDependent.putLong(array, index, BIG_ENDIAN_NATIVE_ORDER ? value : Long.reverseBytes(value)); - } else { - PlatformDependent.putByte(array, index, (byte) (value >>> 56)); - PlatformDependent.putByte(array, index + 1, (byte) (value >>> 48)); - PlatformDependent.putByte(array, index + 2, (byte) (value >>> 40)); - PlatformDependent.putByte(array, index + 3, (byte) (value >>> 32)); - PlatformDependent.putByte(array, index + 4, (byte) (value >>> 24)); - PlatformDependent.putByte(array, index + 5, (byte) (value >>> 16)); - PlatformDependent.putByte(array, index + 6, (byte) (value >>> 8)); - PlatformDependent.putByte(array, index + 7, (byte) value); - } - } - - static void setLongLE(byte[] array, int index, long value) { - if (UNALIGNED) { - PlatformDependent.putLong(array, index, BIG_ENDIAN_NATIVE_ORDER ? Long.reverseBytes(value) : value); - } else { - PlatformDependent.putByte(array, index, (byte) value); - PlatformDependent.putByte(array, index + 1, (byte) (value >>> 8)); - PlatformDependent.putByte(array, index + 2, (byte) (value >>> 16)); - PlatformDependent.putByte(array, index + 3, (byte) (value >>> 24)); - PlatformDependent.putByte(array, index + 4, (byte) (value >>> 32)); - PlatformDependent.putByte(array, index + 5, (byte) (value >>> 40)); - PlatformDependent.putByte(array, index + 6, (byte) (value >>> 48)); - PlatformDependent.putByte(array, index + 7, (byte) (value >>> 56)); - } - } - - static void setZero(byte[] array, int index, int length) { - if (length == 0) { - return; - } - PlatformDependent.setMemory(array, index, length, ZERO); - } - - static ByteBuf copy(AbstractByteBuf buf, long addr, int index, int length) { - //modified to use this classes checkIndex - checkIndex(buf, index, length); - ByteBuf copy = buf.alloc().directBuffer(length, buf.maxCapacity()); - if (length != 0) { - if (copy.hasMemoryAddress()) { - PlatformDependent.copyMemory(addr, copy.memoryAddress(), length); - copy.setIndex(0, length); - } else { - copy.writeBytes(buf, index, length); - } - } - return copy; - } - - static int setBytes(AbstractByteBuf buf, long addr, int index, InputStream in, int length) throws IOException { - //modified to use this classes checkIndex - checkIndex(buf, index, length); - ByteBuf tmpBuf = buf.alloc().heapBuffer(length); - try { - byte[] tmp = tmpBuf.array(); - int offset = tmpBuf.arrayOffset(); - int readBytes = in.read(tmp, offset, length); - if (readBytes > 0) { - PlatformDependent.copyMemory(tmp, offset, addr, readBytes); - } - return readBytes; - } finally { - tmpBuf.release(); - } - } - - static void getBytes(AbstractByteBuf buf, long addr, int index, ByteBuf dst, int dstIndex, int length) { - //modified to use this classes checkIndex - checkIndex(buf, index, length); - checkNotNull(dst, "dst"); - if (isOutOfBounds(dstIndex, length, dst.capacity())) { - throw new IndexOutOfBoundsException("dstIndex: " + dstIndex); - } - - if (dst.hasMemoryAddress()) { - PlatformDependent.copyMemory(addr, dst.memoryAddress() + dstIndex, length); - } else if (dst.hasArray()) { - PlatformDependent.copyMemory(addr, dst.array(), dst.arrayOffset() + dstIndex, length); - } else { - dst.setBytes(dstIndex, buf, index, length); - } - } - - static void getBytes(AbstractByteBuf buf, long addr, int index, byte[] dst, int dstIndex, int length) { - //modified to use this classes checkIndex - checkIndex(buf, index, length); - checkNotNull(dst, "dst"); - if (isOutOfBounds(dstIndex, length, dst.length)) { - throw new IndexOutOfBoundsException("dstIndex: " + dstIndex); - } - if (length != 0) { - PlatformDependent.copyMemory(addr, dst, dstIndex, length); - } - } - - static void getBytes(AbstractByteBuf buf, long addr, int index, ByteBuffer dst) { - //modified to use this classes checkIndex - checkIndex(buf, index, dst.remaining()); - if (dst.remaining() == 0) { - return; - } - - if (dst.isDirect()) { - if (dst.isReadOnly()) { - // We need to check if dst is ready-only so we not write something in it by using Unsafe. - throw new ReadOnlyBufferException(); - } - // Copy to direct memory - long dstAddress = PlatformDependent.directBufferAddress(dst); - PlatformDependent.copyMemory(addr, dstAddress + dst.position(), dst.remaining()); - dst.position(dst.position() + dst.remaining()); - } else if (dst.hasArray()) { - // Copy to array - PlatformDependent.copyMemory(addr, dst.array(), dst.arrayOffset() + dst.position(), dst.remaining()); - dst.position(dst.position() + dst.remaining()); - } else { - dst.put(buf.nioBuffer()); - } - } - - static void setBytes(AbstractByteBuf buf, long addr, int index, ByteBuf src, int srcIndex, int length) { - //modified to use this classes checkIndex - checkIndex(buf, index, length); - checkNotNull(src, "src"); - if (isOutOfBounds(srcIndex, length, src.capacity())) { - throw new IndexOutOfBoundsException("srcIndex: " + srcIndex); - } - - if (length != 0) { - if (src.hasMemoryAddress()) { - PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, addr, length); - } else if (src.hasArray()) { - PlatformDependent.copyMemory(src.array(), src.arrayOffset() + srcIndex, addr, length); - } else { - src.getBytes(srcIndex, buf, index, length); - } - } - } - - static void setBytes(AbstractByteBuf buf, long addr, int index, byte[] src, int srcIndex, int length) { - //modified to use this classes checkIndex - checkIndex(buf, index, length); - if (length != 0) { - PlatformDependent.copyMemory(src, srcIndex, addr, length); - } - } - - static void setBytes(AbstractByteBuf buf, long addr, int index, ByteBuffer src) { - //modified to use this classes checkIndex - checkIndex(buf, index, src.remaining()); - - int length = src.remaining(); - if (length == 0) { - return; - } - - if (src.isDirect()) { - // Copy from direct memory - long srcAddress = PlatformDependent.directBufferAddress(src); - PlatformDependent.copyMemory(srcAddress + src.position(), addr, src.remaining()); - src.position(src.position() + length); - } else if (src.hasArray()) { - // Copy from array - PlatformDependent.copyMemory(src.array(), src.arrayOffset() + src.position(), addr, length); - src.position(src.position() + length); - } else { - ByteBuf tmpBuf = buf.alloc().heapBuffer(length); - try { - byte[] tmp = tmpBuf.array(); - src.get(tmp, tmpBuf.arrayOffset(), length); // moves the src position too - PlatformDependent.copyMemory(tmp, tmpBuf.arrayOffset(), addr, length); - } finally { - tmpBuf.release(); - } - } - } - - static void getBytes(AbstractByteBuf buf, long addr, int index, OutputStream out, int length) throws IOException { - //modified to use this classes checkIndex - checkIndex(buf, index, length); - if (length != 0) { - ByteBuf tmpBuf = buf.alloc().heapBuffer(length); - try { - byte[] tmp = tmpBuf.array(); - int offset = tmpBuf.arrayOffset(); - PlatformDependent.copyMemory(addr, tmp, offset, length); - out.write(tmp, offset, length); - } finally { - tmpBuf.release(); - } - } - } - - static void setZero(long addr, int length) { - if (length == 0) { - return; - } - - PlatformDependent.setMemory(addr, length, ZERO); - } - - //removed newUnsafeDirectByteBuf method - - //add a modified copy of checkIndex and ensureAccessible methods from ByteBuf (because we don't have package private access) - - private static void checkIndex(ByteBuf byteBuf, int index) { - checkIndex(byteBuf, index, 1); - } - - private static void checkIndex(ByteBuf byteBuf, int index, int fieldLength) { - ensureAccessible(byteBuf); - checkIndex0(byteBuf, index, fieldLength); - } - - private static void ensureAccessible(ByteBuf byteBuf) { - if (SystemPropertyUtil.getBoolean("io.netty.buffer.bytebuf.checkAccessible", true) && byteBuf.refCnt() == 0) { - throw new IllegalReferenceCountException(0); - } - } - - private static void checkIndex0(ByteBuf byteBuf, int index, int fieldLength) { - if (isOutOfBounds(index, fieldLength, byteBuf.capacity())) { - throw new IndexOutOfBoundsException(String.format("index: %d, length: %d (expected: range(0, %d))", index, fieldLength, byteBuf.capacity())); - - } - } -} \ No newline at end of file diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java index 09adf40ca5..782233d032 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java @@ -24,9 +24,9 @@ import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; -import org.apache.activemq.artemis.core.io.buffer.UnpooledUnsafeDirectByteBufWrapper; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.utils.Env; @@ -36,7 +36,7 @@ final class MappedFile implements AutoCloseable { private final MappedByteBuffer buffer; private final FileChannel channel; private final long address; - private final UnpooledUnsafeDirectByteBufWrapper byteBufWrapper; + private final ByteBuf byteBufWrapper; private final ChannelBufferWrapper channelBufferWrapper; private int position; private int length; @@ -46,7 +46,7 @@ final class MappedFile implements AutoCloseable { this.buffer = byteBuffer; this.position = position; this.length = length; - this.byteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper(); + this.byteBufWrapper = Unpooled.wrappedBuffer(buffer); this.channelBufferWrapper = new ChannelBufferWrapper(this.byteBufWrapper, false); this.address = PlatformDependent.directBufferAddress(buffer); } @@ -175,13 +175,10 @@ final class MappedFile implements AutoCloseable { */ public void write(EncodingSupport encodingSupport) throws IOException { final int encodedSize = encodingSupport.getEncodeSize(); - this.byteBufWrapper.wrap(this.buffer, this.position, encodedSize); - try { - encodingSupport.encode(this.channelBufferWrapper); - } finally { - this.byteBufWrapper.reset(); - } + this.byteBufWrapper.setIndex(this.position, this.position); + encodingSupport.encode(this.channelBufferWrapper); position += encodedSize; + assert (byteBufWrapper.writerIndex() == position); if (position > this.length) { this.length = position; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index ce8f3640bd..caabf46a83 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.buffer.Unpooled; import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; @@ -34,7 +35,6 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; -import org.apache.activemq.artemis.core.io.buffer.UnpooledUnsafeDirectByteBufWrapper; import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -89,9 +89,6 @@ public final class Page implements Comparable { private boolean canBeMapped; - private final ActiveMQBuffer activeMQBuffer; - private final UnpooledUnsafeDirectByteBufWrapper unsafeByteBufWrapper; - public Page(final SimpleString storeName, final StorageManager storageManager, final SequentialFileFactory factory, @@ -104,8 +101,6 @@ public final class Page implements Comparable { this.storeName = storeName; this.canBeMapped = fileFactory instanceof NIOSequentialFileFactory || fileFactory instanceof MappedSequentialFileFactory; //pooled buffers to avoid allocations on hot paths - this.unsafeByteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper(); - this.activeMQBuffer = new ChannelBufferWrapper(this.unsafeByteBufWrapper); } public int getPageId() { @@ -152,22 +147,20 @@ public final class Page implements Comparable { file.read(buffer); buffer.rewind(); assert (buffer.limit() == fileSize) : "buffer doesn't contains the whole file"; - this.unsafeByteBufWrapper.wrap(buffer, 0, fileSize); - try { - this.activeMQBuffer.clear(); - this.activeMQBuffer.writerIndex(fileSize); - read(storage, this.activeMQBuffer, messages); - } finally { - this.unsafeByteBufWrapper.reset(); - } + ChannelBufferWrapper activeMQBuffer = wrapBuffer(fileSize, buffer); + read(storage, activeMQBuffer, messages); } finally { this.fileFactory.releaseBuffer(buffer); } } + private ChannelBufferWrapper wrapBuffer(int fileSize, ByteBuffer buffer) { + ChannelBufferWrapper activeMQBuffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer)); + return activeMQBuffer; + } + private static MappedByteBuffer mapFileForRead(File file, int fileSize) { - try (RandomAccessFile raf = new RandomAccessFile(file, "rw"); - FileChannel channel = raf.getChannel()) { + try (RandomAccessFile raf = new RandomAccessFile(file, "rw"); FileChannel channel = raf.getChannel()) { return channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize); } catch (Exception e) { throw new IllegalStateException(e); @@ -179,13 +172,10 @@ public final class Page implements Comparable { //use a readonly mapped view of the file final int mappedSize = size.get(); final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), mappedSize); - this.unsafeByteBufWrapper.wrap(mappedByteBuffer, 0, mappedSize); + ChannelBufferWrapper activeMQBuffer = wrapBuffer(mappedSize, mappedByteBuffer); try { - this.activeMQBuffer.clear(); - this.activeMQBuffer.writerIndex(mappedSize); - return read(storage, this.activeMQBuffer, messages); + return read(storage, activeMQBuffer, messages); } finally { - this.unsafeByteBufWrapper.reset(); //unmap the file after read it to avoid GC to take care of it PlatformDependent.freeDirectBuffer(mappedByteBuffer); } @@ -238,27 +228,23 @@ public final class Page implements Comparable { final int messageEncodedSize = message.getEncodeSize(); final int bufferSize = messageEncodedSize + Page.SIZE_RECORD; final ByteBuffer buffer = fileFactory.newBuffer(bufferSize); - this.unsafeByteBufWrapper.wrap(buffer, 0, bufferSize); - try { - this.activeMQBuffer.clear(); - this.activeMQBuffer.writeByte(Page.START_BYTE); - this.activeMQBuffer.writeInt(messageEncodedSize); - message.encode(this.activeMQBuffer); - this.activeMQBuffer.writeByte(Page.END_BYTE); - assert (this.activeMQBuffer.readableBytes() == bufferSize) : "messageEncodedSize is different from expected"; - //buffer limit and position are the same - assert (buffer.remaining() == bufferSize) : "buffer position or limit are changed"; - file.writeDirect(buffer, false); - if (pageCache != null) { - pageCache.addLiveMessage(message); - } - //lighter than addAndGet when single writer - numberOfMessages.lazySet(numberOfMessages.get() + 1); - size.lazySet(size.get() + bufferSize); - storageManager.pageWrite(message, pageId); - } finally { - this.unsafeByteBufWrapper.reset(); + ChannelBufferWrapper activeMQBuffer = wrapBuffer(bufferSize, buffer); + activeMQBuffer.clear(); + activeMQBuffer.writeByte(Page.START_BYTE); + activeMQBuffer.writeInt(messageEncodedSize); + message.encode(activeMQBuffer); + activeMQBuffer.writeByte(Page.END_BYTE); + assert (activeMQBuffer.readableBytes() == bufferSize) : "messageEncodedSize is different from expected"; + //buffer limit and position are the same + assert (buffer.remaining() == bufferSize) : "buffer position or limit are changed"; + file.writeDirect(buffer, false); + if (pageCache != null) { + pageCache.addLiveMessage(message); } + //lighter than addAndGet when single writer + numberOfMessages.lazySet(numberOfMessages.get() + 1); + size.lazySet(size.get() + bufferSize); + storageManager.pageWrite(message, pageId); } public void sync() throws Exception { @@ -315,7 +301,7 @@ public final class Page implements Comparable { if (messages != null) { for (PagedMessage msg : messages) { - if (msg.getMessage() instanceof ICoreMessage && (msg.getMessage()).isLargeMessage()) { + if (msg.getMessage() instanceof ICoreMessage && (msg.getMessage()).isLargeMessage()) { LargeServerMessage lmsg = (LargeServerMessage) msg.getMessage(); // Remember, cannot call delete directly here