ARTEMIS-1397 Removing Netty Copied classes
instead of duplicating a buffer from Netty, this will use an existing Wrapped Unpooled Buffer Which will in turn use Unsafe Properly.
This commit is contained in:
parent
69e52eacee
commit
bb554e5264
|
@ -59,7 +59,7 @@ under the License.
|
||||||
<acceptors>
|
<acceptors>
|
||||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports Core, OpenWire, Stomp and AMQP. -->
|
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports Core, OpenWire, Stomp and AMQP. -->
|
||||||
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE</acceptor>
|
||||||
|
|
||||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||||
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
|
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
|
||||||
|
|
|
@ -1,593 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.activemq.artemis.core.io.buffer;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.nio.ByteOrder;
|
|
||||||
import java.nio.ReadOnlyBufferException;
|
|
||||||
import java.nio.channels.FileChannel;
|
|
||||||
import java.nio.channels.GatheringByteChannel;
|
|
||||||
import java.nio.channels.ScatteringByteChannel;
|
|
||||||
|
|
||||||
import io.netty.buffer.AbstractReferenceCountedByteBuf;
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
|
||||||
import io.netty.buffer.SwappedByteBuf;
|
|
||||||
import io.netty.util.internal.PlatformDependent;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A NIO {@link ByteBuffer}, byte[] and address direct access wrapper.
|
|
||||||
* Only content manipulation operations are supported.
|
|
||||||
* Is best suited only for encoding/decoding purposes.
|
|
||||||
*/
|
|
||||||
public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf {
|
|
||||||
|
|
||||||
private static final byte ZERO = 0;
|
|
||||||
private ByteBuffer buffer;
|
|
||||||
private int arrayOffset;
|
|
||||||
private byte[] array;
|
|
||||||
private long memoryAddress;
|
|
||||||
|
|
||||||
public UnpooledUnsafeDirectByteBufWrapper() {
|
|
||||||
super(0);
|
|
||||||
this.buffer = null;
|
|
||||||
this.arrayOffset = -1;
|
|
||||||
this.array = null;
|
|
||||||
this.memoryAddress = 0L;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void wrap(long address, int length) {
|
|
||||||
this.memoryAddress = address;
|
|
||||||
this.arrayOffset = -1;
|
|
||||||
this.array = null;
|
|
||||||
this.buffer = null;
|
|
||||||
clear();
|
|
||||||
maxCapacity(length);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void wrap(byte[] array, int srcIndex, int length) {
|
|
||||||
if (array != null) {
|
|
||||||
this.memoryAddress = 0L;
|
|
||||||
this.arrayOffset = srcIndex;
|
|
||||||
this.array = array;
|
|
||||||
this.buffer = null;
|
|
||||||
clear();
|
|
||||||
maxCapacity(length);
|
|
||||||
} else {
|
|
||||||
reset();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void wrap(ByteBuffer buffer, int srcIndex, int length) {
|
|
||||||
if (buffer != null) {
|
|
||||||
this.buffer = buffer;
|
|
||||||
if (buffer.isDirect()) {
|
|
||||||
this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex;
|
|
||||||
this.arrayOffset = -1;
|
|
||||||
this.array = null;
|
|
||||||
} else {
|
|
||||||
this.arrayOffset = srcIndex;
|
|
||||||
this.array = buffer.array();
|
|
||||||
this.memoryAddress = 0L;
|
|
||||||
}
|
|
||||||
clear();
|
|
||||||
maxCapacity(length);
|
|
||||||
} else {
|
|
||||||
reset();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void reset() {
|
|
||||||
this.buffer = null;
|
|
||||||
this.memoryAddress = 0L;
|
|
||||||
this.arrayOffset = -1;
|
|
||||||
this.array = null;
|
|
||||||
clear();
|
|
||||||
maxCapacity(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDirect() {
|
|
||||||
return buffer != null ? buffer.isDirect() : false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int capacity() {
|
|
||||||
return maxCapacity();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBuf capacity(int newCapacity) {
|
|
||||||
if (newCapacity != maxCapacity()) {
|
|
||||||
throw new IllegalArgumentException("can't set a capacity different from the max allowed one");
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBufAllocator alloc() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteOrder order() {
|
|
||||||
return ByteOrder.BIG_ENDIAN;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasArray() {
|
|
||||||
return array != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte[] array() {
|
|
||||||
return array;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int arrayOffset() {
|
|
||||||
return arrayOffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasMemoryAddress() {
|
|
||||||
return memoryAddress != 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long memoryAddress() {
|
|
||||||
return memoryAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected byte _getByte(int index) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
return UnsafeByteBufUtil.getByte(addr(index));
|
|
||||||
} else {
|
|
||||||
return UnsafeByteBufUtil.getByte(array, idx(index));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected short _getShort(int index) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
return UnsafeByteBufUtil.getShort(addr(index));
|
|
||||||
} else {
|
|
||||||
return UnsafeByteBufUtil.getShort(array, idx(index));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected short _getShortLE(int index) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
return UnsafeByteBufUtil.getShortLE(addr(index));
|
|
||||||
} else {
|
|
||||||
return UnsafeByteBufUtil.getShortLE(array, idx(index));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected int _getUnsignedMedium(int index) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
return UnsafeByteBufUtil.getUnsignedMedium(addr(index));
|
|
||||||
} else {
|
|
||||||
return UnsafeByteBufUtil.getUnsignedMedium(array, idx(index));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected int _getUnsignedMediumLE(int index) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index));
|
|
||||||
} else {
|
|
||||||
return UnsafeByteBufUtil.getUnsignedMediumLE(array, idx(index));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected int _getInt(int index) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
return UnsafeByteBufUtil.getInt(addr(index));
|
|
||||||
} else {
|
|
||||||
return UnsafeByteBufUtil.getInt(array, idx(index));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected int _getIntLE(int index) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
return UnsafeByteBufUtil.getIntLE(addr(index));
|
|
||||||
} else {
|
|
||||||
return UnsafeByteBufUtil.getIntLE(array, idx(index));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected long _getLong(int index) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
return UnsafeByteBufUtil.getLong(addr(index));
|
|
||||||
} else {
|
|
||||||
return UnsafeByteBufUtil.getLong(array, idx(index));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected long _getLongLE(int index) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
return UnsafeByteBufUtil.getLongLE(addr(index));
|
|
||||||
} else {
|
|
||||||
return UnsafeByteBufUtil.getLongLE(array, idx(index));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
|
|
||||||
} else {
|
|
||||||
final int idx = idx(index);
|
|
||||||
checkDstIndex(idx, length, dstIndex, dst.capacity());
|
|
||||||
getBytes(array, idx, dst, dstIndex, length);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void getBytes(byte[] array, int idx, ByteBuf dst, int dstIndex, int length) {
|
|
||||||
if (dst.hasMemoryAddress()) {
|
|
||||||
PlatformDependent.copyMemory(array, idx, dst.memoryAddress() + dstIndex, length);
|
|
||||||
} else if (dst.hasArray()) {
|
|
||||||
System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dstIndex, length);
|
|
||||||
} else {
|
|
||||||
dst.setBytes(dstIndex, array, idx, length);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
|
|
||||||
} else {
|
|
||||||
final int idx = idx(index);
|
|
||||||
checkDstIndex(idx, length, dstIndex, dst.length);
|
|
||||||
System.arraycopy(array, idx, dst, dstIndex, length);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBuf getBytes(int index, ByteBuffer dst) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst);
|
|
||||||
} else {
|
|
||||||
final int idx = idx(index);
|
|
||||||
checkIndex(idx, dst.remaining());
|
|
||||||
getBytes(array, idx, dst);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void getBytes(byte[] array, int idx, ByteBuffer dst) {
|
|
||||||
if (dst.remaining() == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (dst.isDirect()) {
|
|
||||||
if (dst.isReadOnly()) {
|
|
||||||
// We need to check if dst is ready-only so we not write something in it by using Unsafe.
|
|
||||||
throw new ReadOnlyBufferException();
|
|
||||||
}
|
|
||||||
// Copy to direct memory
|
|
||||||
final long dstAddress = PlatformDependent.directBufferAddress(dst);
|
|
||||||
PlatformDependent.copyMemory(array, idx, dstAddress + dst.position(), dst.remaining());
|
|
||||||
dst.position(dst.position() + dst.remaining());
|
|
||||||
} else if (dst.hasArray()) {
|
|
||||||
// Copy to array
|
|
||||||
System.arraycopy(array, idx, dst.array(), dst.arrayOffset() + dst.position(), dst.remaining());
|
|
||||||
dst.position(dst.position() + dst.remaining());
|
|
||||||
} else {
|
|
||||||
dst.put(array, idx, dst.remaining());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBuf readBytes(ByteBuffer dst) {
|
|
||||||
final int length = dst.remaining();
|
|
||||||
checkReadableBytes(length);
|
|
||||||
int rIndex = readerIndex();
|
|
||||||
getBytes(rIndex, dst);
|
|
||||||
readerIndex(rIndex + length);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void _setByte(int index, int value) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setByte(addr(index), value);
|
|
||||||
} else {
|
|
||||||
UnsafeByteBufUtil.setByte(array, idx(index), value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void _setShort(int index, int value) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setShort(addr(index), value);
|
|
||||||
} else {
|
|
||||||
UnsafeByteBufUtil.setShort(array, idx(index), value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void _setShortLE(int index, int value) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setShortLE(addr(index), value);
|
|
||||||
} else {
|
|
||||||
UnsafeByteBufUtil.setShortLE(array, idx(index), value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void _setMedium(int index, int value) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setMedium(addr(index), value);
|
|
||||||
} else {
|
|
||||||
UnsafeByteBufUtil.setMedium(array, idx(index), value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void _setMediumLE(int index, int value) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setMediumLE(addr(index), value);
|
|
||||||
} else {
|
|
||||||
UnsafeByteBufUtil.setMediumLE(array, idx(index), value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void _setInt(int index, int value) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setInt(addr(index), value);
|
|
||||||
} else {
|
|
||||||
UnsafeByteBufUtil.setInt(array, idx(index), value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void _setIntLE(int index, int value) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setIntLE(addr(index), value);
|
|
||||||
} else {
|
|
||||||
UnsafeByteBufUtil.setIntLE(array, idx(index), value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void _setLong(int index, long value) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setLong(addr(index), value);
|
|
||||||
} else {
|
|
||||||
UnsafeByteBufUtil.setLong(array, idx(index), value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void _setLongLE(int index, long value) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setLongLE(addr(index), value);
|
|
||||||
} else {
|
|
||||||
UnsafeByteBufUtil.setLongLE(array, idx(index), value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
|
|
||||||
} else {
|
|
||||||
final int idx = idx(index);
|
|
||||||
checkSrcIndex(idx, length, srcIndex, src.capacity());
|
|
||||||
setBytes(array, idx, src, srcIndex, length);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void setBytes(byte[] array, int idx, ByteBuf src, int srcIndex, int length) {
|
|
||||||
if (src.hasMemoryAddress()) {
|
|
||||||
PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, idx, length);
|
|
||||||
} else if (src.hasArray()) {
|
|
||||||
System.arraycopy(src.array(), src.arrayOffset() + srcIndex, array, idx, length);
|
|
||||||
} else {
|
|
||||||
src.getBytes(srcIndex, array, idx, length);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
|
|
||||||
} else {
|
|
||||||
final int idx = idx(index);
|
|
||||||
checkSrcIndex(idx, length, srcIndex, src.length);
|
|
||||||
System.arraycopy(src, srcIndex, array, idx, length);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBuf setBytes(int index, ByteBuffer src) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
UnsafeByteBufUtil.setBytes(this, addr(index), index, src);
|
|
||||||
} else {
|
|
||||||
final int idx = idx(index);
|
|
||||||
checkSrcIndex(idx, src.remaining(), src.position(), src.capacity());
|
|
||||||
setBytes(array, idx(index), src);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void setBytes(byte[] array, int idx, ByteBuffer src) {
|
|
||||||
final int length = src.remaining();
|
|
||||||
if (length == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (src.isDirect()) {
|
|
||||||
// Copy from direct memory
|
|
||||||
final long srcAddress = PlatformDependent.directBufferAddress(src);
|
|
||||||
PlatformDependent.copyMemory(srcAddress + src.position(), array, idx, length);
|
|
||||||
src.position(src.position() + length);
|
|
||||||
} else if (src.hasArray()) {
|
|
||||||
// Copy from array
|
|
||||||
System.arraycopy(src.array(), src.arrayOffset() + src.position(), array, idx, length);
|
|
||||||
src.position(src.position() + length);
|
|
||||||
} else {
|
|
||||||
src.get(array, idx, src.remaining());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public int readBytes(GatheringByteChannel out, int length) throws IOException {
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public int readBytes(FileChannel out, long position, int length) throws IOException {
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public int setBytes(int index, InputStream in, int length) throws IOException {
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int nioBufferCount() {
|
|
||||||
return buffer == null ? 0 : 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public ByteBuffer[] nioBuffers(int index, int length) {
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public ByteBuf copy(int index, int length) {
|
|
||||||
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public ByteBuffer internalNioBuffer(int index, int length) {
|
|
||||||
throw new UnsupportedOperationException("cannot access directly the wrapped buffer!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public ByteBuffer nioBuffer(int index, int length) {
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
protected void deallocate() {
|
|
||||||
//NO_OP
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBuf unwrap() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private long addr(int index) {
|
|
||||||
return memoryAddress + index;
|
|
||||||
}
|
|
||||||
|
|
||||||
private int idx(int index) {
|
|
||||||
return arrayOffset + index;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
protected SwappedByteBuf newSwappedByteBuf() {
|
|
||||||
throw new UnsupportedOperationException("unsupported!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBuf setZero(int index, int length) {
|
|
||||||
if (hasMemoryAddress()) {
|
|
||||||
if (length == 0) {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
this.checkIndex(index, length);
|
|
||||||
PlatformDependent.setMemory(addr(index), length, ZERO);
|
|
||||||
} else {
|
|
||||||
//prefer Arrays::fill here?
|
|
||||||
UnsafeByteBufUtil.setZero(array, idx(index), length);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ByteBuf writeZero(int length) {
|
|
||||||
ensureWritable(length);
|
|
||||||
int wIndex = writerIndex();
|
|
||||||
setZero(wIndex, length);
|
|
||||||
writerIndex(wIndex + length);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,565 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
/* Copyright 2015 The Netty Project */
|
|
||||||
package org.apache.activemq.artemis.core.io.buffer;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.nio.ByteOrder;
|
|
||||||
import java.nio.ReadOnlyBufferException;
|
|
||||||
|
|
||||||
import io.netty.buffer.AbstractByteBuf;
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import io.netty.util.IllegalReferenceCountException;
|
|
||||||
import io.netty.util.internal.PlatformDependent;
|
|
||||||
import io.netty.util.internal.SystemPropertyUtil;
|
|
||||||
|
|
||||||
import static io.netty.util.internal.MathUtil.isOutOfBounds;
|
|
||||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
|
||||||
import static io.netty.util.internal.PlatformDependent.BIG_ENDIAN_NATIVE_ORDER;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* All operations get and set as {@link ByteOrder#BIG_ENDIAN}.
|
|
||||||
*/
|
|
||||||
final class UnsafeByteBufUtil {
|
|
||||||
|
|
||||||
private static final boolean UNALIGNED = PlatformDependent.isUnaligned();
|
|
||||||
|
|
||||||
private static final byte ZERO = 0;
|
|
||||||
|
|
||||||
static byte getByte(long address) {
|
|
||||||
return PlatformDependent.getByte(address);
|
|
||||||
}
|
|
||||||
|
|
||||||
static short getShort(long address) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
short v = PlatformDependent.getShort(address);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? v : Short.reverseBytes(v);
|
|
||||||
}
|
|
||||||
return (short) (PlatformDependent.getByte(address) << 8 | PlatformDependent.getByte(address + 1) & 0xff);
|
|
||||||
}
|
|
||||||
|
|
||||||
static short getShortLE(long address) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
short v = PlatformDependent.getShort(address);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(v) : v;
|
|
||||||
}
|
|
||||||
return (short) (PlatformDependent.getByte(address) & 0xff | PlatformDependent.getByte(address + 1) << 8);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int getUnsignedMedium(long address) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
return (PlatformDependent.getByte(address) & 0xff) << 16 | (BIG_ENDIAN_NATIVE_ORDER ? PlatformDependent.getShort(address + 1) : Short.reverseBytes(PlatformDependent.getShort(address + 1))) & 0xffff;
|
|
||||||
}
|
|
||||||
return (PlatformDependent.getByte(address) & 0xff) << 16 | (PlatformDependent.getByte(address + 1) & 0xff) << 8 | PlatformDependent.getByte(address + 2) & 0xff;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int getUnsignedMediumLE(long address) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
return (PlatformDependent.getByte(address) & 0xff) | ((BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(PlatformDependent.getShort(address + 1)) : PlatformDependent.getShort(address + 1)) & 0xffff) << 8;
|
|
||||||
}
|
|
||||||
return PlatformDependent.getByte(address) & 0xff | (PlatformDependent.getByte(address + 1) & 0xff) << 8 | (PlatformDependent.getByte(address + 2) & 0xff) << 16;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int getInt(long address) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
int v = PlatformDependent.getInt(address);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? v : Integer.reverseBytes(v);
|
|
||||||
}
|
|
||||||
return PlatformDependent.getByte(address) << 24 | (PlatformDependent.getByte(address + 1) & 0xff) << 16 | (PlatformDependent.getByte(address + 2) & 0xff) << 8 | PlatformDependent.getByte(address + 3) & 0xff;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int getIntLE(long address) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
int v = PlatformDependent.getInt(address);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? Integer.reverseBytes(v) : v;
|
|
||||||
}
|
|
||||||
return PlatformDependent.getByte(address) & 0xff | (PlatformDependent.getByte(address + 1) & 0xff) << 8 | (PlatformDependent.getByte(address + 2) & 0xff) << 16 | PlatformDependent.getByte(address + 3) << 24;
|
|
||||||
}
|
|
||||||
|
|
||||||
static long getLong(long address) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
long v = PlatformDependent.getLong(address);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? v : Long.reverseBytes(v);
|
|
||||||
}
|
|
||||||
return ((long) PlatformDependent.getByte(address)) << 56 | (PlatformDependent.getByte(address + 1) & 0xffL) << 48 | (PlatformDependent.getByte(address + 2) & 0xffL) << 40 | (PlatformDependent.getByte(address + 3) & 0xffL) << 32 | (PlatformDependent.getByte(address + 4) & 0xffL) << 24 | (PlatformDependent.getByte(address + 5) & 0xffL) << 16 | (PlatformDependent.getByte(address + 6) & 0xffL) << 8 | (PlatformDependent.getByte(address + 7)) & 0xffL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static long getLongLE(long address) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
long v = PlatformDependent.getLong(address);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? Long.reverseBytes(v) : v;
|
|
||||||
}
|
|
||||||
return (PlatformDependent.getByte(address)) & 0xffL | (PlatformDependent.getByte(address + 1) & 0xffL) << 8 | (PlatformDependent.getByte(address + 2) & 0xffL) << 16 | (PlatformDependent.getByte(address + 3) & 0xffL) << 24 | (PlatformDependent.getByte(address + 4) & 0xffL) << 32 | (PlatformDependent.getByte(address + 5) & 0xffL) << 40 | (PlatformDependent.getByte(address + 6) & 0xffL) << 48 | ((long) PlatformDependent.getByte(address + 7)) << 56;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setByte(long address, int value) {
|
|
||||||
PlatformDependent.putByte(address, (byte) value);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setShort(long address, int value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putShort(address, BIG_ENDIAN_NATIVE_ORDER ? (short) value : Short.reverseBytes((short) value));
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(address, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(address + 1, (byte) value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setShortLE(long address, int value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putShort(address, BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes((short) value) : (short) value);
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(address, (byte) value);
|
|
||||||
PlatformDependent.putByte(address + 1, (byte) (value >>> 8));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setMedium(long address, int value) {
|
|
||||||
PlatformDependent.putByte(address, (byte) (value >>> 16));
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putShort(address + 1, BIG_ENDIAN_NATIVE_ORDER ? (short) value : Short.reverseBytes((short) value));
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(address + 1, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(address + 2, (byte) value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setMediumLE(long address, int value) {
|
|
||||||
PlatformDependent.putByte(address, (byte) value);
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putShort(address + 1, BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes((short) (value >>> 8)) : (short) (value >>> 8));
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(address + 1, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(address + 2, (byte) (value >>> 16));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setInt(long address, int value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putInt(address, BIG_ENDIAN_NATIVE_ORDER ? value : Integer.reverseBytes(value));
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(address, (byte) (value >>> 24));
|
|
||||||
PlatformDependent.putByte(address + 1, (byte) (value >>> 16));
|
|
||||||
PlatformDependent.putByte(address + 2, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(address + 3, (byte) value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setIntLE(long address, int value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putInt(address, BIG_ENDIAN_NATIVE_ORDER ? Integer.reverseBytes(value) : value);
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(address, (byte) value);
|
|
||||||
PlatformDependent.putByte(address + 1, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(address + 2, (byte) (value >>> 16));
|
|
||||||
PlatformDependent.putByte(address + 3, (byte) (value >>> 24));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setLong(long address, long value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putLong(address, BIG_ENDIAN_NATIVE_ORDER ? value : Long.reverseBytes(value));
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(address, (byte) (value >>> 56));
|
|
||||||
PlatformDependent.putByte(address + 1, (byte) (value >>> 48));
|
|
||||||
PlatformDependent.putByte(address + 2, (byte) (value >>> 40));
|
|
||||||
PlatformDependent.putByte(address + 3, (byte) (value >>> 32));
|
|
||||||
PlatformDependent.putByte(address + 4, (byte) (value >>> 24));
|
|
||||||
PlatformDependent.putByte(address + 5, (byte) (value >>> 16));
|
|
||||||
PlatformDependent.putByte(address + 6, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(address + 7, (byte) value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setLongLE(long address, long value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putLong(address, BIG_ENDIAN_NATIVE_ORDER ? Long.reverseBytes(value) : value);
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(address, (byte) value);
|
|
||||||
PlatformDependent.putByte(address + 1, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(address + 2, (byte) (value >>> 16));
|
|
||||||
PlatformDependent.putByte(address + 3, (byte) (value >>> 24));
|
|
||||||
PlatformDependent.putByte(address + 4, (byte) (value >>> 32));
|
|
||||||
PlatformDependent.putByte(address + 5, (byte) (value >>> 40));
|
|
||||||
PlatformDependent.putByte(address + 6, (byte) (value >>> 48));
|
|
||||||
PlatformDependent.putByte(address + 7, (byte) (value >>> 56));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static byte getByte(byte[] array, int index) {
|
|
||||||
return PlatformDependent.getByte(array, index);
|
|
||||||
}
|
|
||||||
|
|
||||||
static short getShort(byte[] array, int index) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
short v = PlatformDependent.getShort(array, index);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? v : Short.reverseBytes(v);
|
|
||||||
}
|
|
||||||
return (short) (PlatformDependent.getByte(array, index) << 8 | PlatformDependent.getByte(array, index + 1) & 0xff);
|
|
||||||
}
|
|
||||||
|
|
||||||
static short getShortLE(byte[] array, int index) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
short v = PlatformDependent.getShort(array, index);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(v) : v;
|
|
||||||
}
|
|
||||||
return (short) (PlatformDependent.getByte(array, index) & 0xff | PlatformDependent.getByte(array, index + 1) << 8);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int getUnsignedMedium(byte[] array, int index) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
return (PlatformDependent.getByte(array, index) & 0xff) << 16 | (BIG_ENDIAN_NATIVE_ORDER ? PlatformDependent.getShort(array, index + 1) : Short.reverseBytes(PlatformDependent.getShort(array, index + 1))) & 0xffff;
|
|
||||||
}
|
|
||||||
return (PlatformDependent.getByte(array, index) & 0xff) << 16 | (PlatformDependent.getByte(array, index + 1) & 0xff) << 8 | PlatformDependent.getByte(array, index + 2) & 0xff;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int getUnsignedMediumLE(byte[] array, int index) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
return (PlatformDependent.getByte(array, index) & 0xff) | ((BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(PlatformDependent.getShort(array, index + 1)) : PlatformDependent.getShort(array, index + 1)) & 0xffff) << 8;
|
|
||||||
}
|
|
||||||
return PlatformDependent.getByte(array, index) & 0xff | (PlatformDependent.getByte(array, index + 1) & 0xff) << 8 | (PlatformDependent.getByte(array, index + 2) & 0xff) << 16;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int getInt(byte[] array, int index) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
int v = PlatformDependent.getInt(array, index);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? v : Integer.reverseBytes(v);
|
|
||||||
}
|
|
||||||
return PlatformDependent.getByte(array, index) << 24 | (PlatformDependent.getByte(array, index + 1) & 0xff) << 16 | (PlatformDependent.getByte(array, index + 2) & 0xff) << 8 | PlatformDependent.getByte(array, index + 3) & 0xff;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int getIntLE(byte[] array, int index) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
int v = PlatformDependent.getInt(array, index);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? Integer.reverseBytes(v) : v;
|
|
||||||
}
|
|
||||||
return PlatformDependent.getByte(array, index) & 0xff | (PlatformDependent.getByte(array, index + 1) & 0xff) << 8 | (PlatformDependent.getByte(array, index + 2) & 0xff) << 16 | PlatformDependent.getByte(array, index + 3) << 24;
|
|
||||||
}
|
|
||||||
|
|
||||||
static long getLong(byte[] array, int index) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
long v = PlatformDependent.getLong(array, index);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? v : Long.reverseBytes(v);
|
|
||||||
}
|
|
||||||
return ((long) PlatformDependent.getByte(array, index)) << 56 | (PlatformDependent.getByte(array, index + 1) & 0xffL) << 48 | (PlatformDependent.getByte(array, index + 2) & 0xffL) << 40 | (PlatformDependent.getByte(array, index + 3) & 0xffL) << 32 | (PlatformDependent.getByte(array, index + 4) & 0xffL) << 24 | (PlatformDependent.getByte(array, index + 5) & 0xffL) << 16 | (PlatformDependent.getByte(array, index + 6) & 0xffL) << 8 | (PlatformDependent.getByte(array, index + 7)) & 0xffL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static long getLongLE(byte[] array, int index) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
long v = PlatformDependent.getLong(array, index);
|
|
||||||
return BIG_ENDIAN_NATIVE_ORDER ? Long.reverseBytes(v) : v;
|
|
||||||
}
|
|
||||||
return PlatformDependent.getByte(array, index) & 0xffL | (PlatformDependent.getByte(array, index + 1) & 0xffL) << 8 | (PlatformDependent.getByte(array, index + 2) & 0xffL) << 16 | (PlatformDependent.getByte(array, index + 3) & 0xffL) << 24 | (PlatformDependent.getByte(array, index + 4) & 0xffL) << 32 | (PlatformDependent.getByte(array, index + 5) & 0xffL) << 40 | (PlatformDependent.getByte(array, index + 6) & 0xffL) << 48 | ((long) PlatformDependent.getByte(array, index + 7)) << 56;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setByte(byte[] array, int index, int value) {
|
|
||||||
PlatformDependent.putByte(array, index, (byte) value);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setShort(byte[] array, int index, int value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putShort(array, index, BIG_ENDIAN_NATIVE_ORDER ? (short) value : Short.reverseBytes((short) value));
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(array, index, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(array, index + 1, (byte) value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setShortLE(byte[] array, int index, int value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putShort(array, index, BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes((short) value) : (short) value);
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(array, index, (byte) value);
|
|
||||||
PlatformDependent.putByte(array, index + 1, (byte) (value >>> 8));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setMedium(byte[] array, int index, int value) {
|
|
||||||
PlatformDependent.putByte(array, index, (byte) (value >>> 16));
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putShort(array, index + 1, BIG_ENDIAN_NATIVE_ORDER ? (short) value : Short.reverseBytes((short) value));
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(array, index + 1, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(array, index + 2, (byte) value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setMediumLE(byte[] array, int index, int value) {
|
|
||||||
PlatformDependent.putByte(array, index, (byte) value);
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putShort(array, index + 1, BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes((short) (value >>> 8)) : (short) (value >>> 8));
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(array, index + 1, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(array, index + 2, (byte) (value >>> 16));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setInt(byte[] array, int index, int value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putInt(array, index, BIG_ENDIAN_NATIVE_ORDER ? value : Integer.reverseBytes(value));
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(array, index, (byte) (value >>> 24));
|
|
||||||
PlatformDependent.putByte(array, index + 1, (byte) (value >>> 16));
|
|
||||||
PlatformDependent.putByte(array, index + 2, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(array, index + 3, (byte) value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setIntLE(byte[] array, int index, int value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putInt(array, index, BIG_ENDIAN_NATIVE_ORDER ? Integer.reverseBytes(value) : value);
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(array, index, (byte) value);
|
|
||||||
PlatformDependent.putByte(array, index + 1, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(array, index + 2, (byte) (value >>> 16));
|
|
||||||
PlatformDependent.putByte(array, index + 3, (byte) (value >>> 24));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setLong(byte[] array, int index, long value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putLong(array, index, BIG_ENDIAN_NATIVE_ORDER ? value : Long.reverseBytes(value));
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(array, index, (byte) (value >>> 56));
|
|
||||||
PlatformDependent.putByte(array, index + 1, (byte) (value >>> 48));
|
|
||||||
PlatformDependent.putByte(array, index + 2, (byte) (value >>> 40));
|
|
||||||
PlatformDependent.putByte(array, index + 3, (byte) (value >>> 32));
|
|
||||||
PlatformDependent.putByte(array, index + 4, (byte) (value >>> 24));
|
|
||||||
PlatformDependent.putByte(array, index + 5, (byte) (value >>> 16));
|
|
||||||
PlatformDependent.putByte(array, index + 6, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(array, index + 7, (byte) value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setLongLE(byte[] array, int index, long value) {
|
|
||||||
if (UNALIGNED) {
|
|
||||||
PlatformDependent.putLong(array, index, BIG_ENDIAN_NATIVE_ORDER ? Long.reverseBytes(value) : value);
|
|
||||||
} else {
|
|
||||||
PlatformDependent.putByte(array, index, (byte) value);
|
|
||||||
PlatformDependent.putByte(array, index + 1, (byte) (value >>> 8));
|
|
||||||
PlatformDependent.putByte(array, index + 2, (byte) (value >>> 16));
|
|
||||||
PlatformDependent.putByte(array, index + 3, (byte) (value >>> 24));
|
|
||||||
PlatformDependent.putByte(array, index + 4, (byte) (value >>> 32));
|
|
||||||
PlatformDependent.putByte(array, index + 5, (byte) (value >>> 40));
|
|
||||||
PlatformDependent.putByte(array, index + 6, (byte) (value >>> 48));
|
|
||||||
PlatformDependent.putByte(array, index + 7, (byte) (value >>> 56));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setZero(byte[] array, int index, int length) {
|
|
||||||
if (length == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
PlatformDependent.setMemory(array, index, length, ZERO);
|
|
||||||
}
|
|
||||||
|
|
||||||
static ByteBuf copy(AbstractByteBuf buf, long addr, int index, int length) {
|
|
||||||
//modified to use this classes checkIndex
|
|
||||||
checkIndex(buf, index, length);
|
|
||||||
ByteBuf copy = buf.alloc().directBuffer(length, buf.maxCapacity());
|
|
||||||
if (length != 0) {
|
|
||||||
if (copy.hasMemoryAddress()) {
|
|
||||||
PlatformDependent.copyMemory(addr, copy.memoryAddress(), length);
|
|
||||||
copy.setIndex(0, length);
|
|
||||||
} else {
|
|
||||||
copy.writeBytes(buf, index, length);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return copy;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int setBytes(AbstractByteBuf buf, long addr, int index, InputStream in, int length) throws IOException {
|
|
||||||
//modified to use this classes checkIndex
|
|
||||||
checkIndex(buf, index, length);
|
|
||||||
ByteBuf tmpBuf = buf.alloc().heapBuffer(length);
|
|
||||||
try {
|
|
||||||
byte[] tmp = tmpBuf.array();
|
|
||||||
int offset = tmpBuf.arrayOffset();
|
|
||||||
int readBytes = in.read(tmp, offset, length);
|
|
||||||
if (readBytes > 0) {
|
|
||||||
PlatformDependent.copyMemory(tmp, offset, addr, readBytes);
|
|
||||||
}
|
|
||||||
return readBytes;
|
|
||||||
} finally {
|
|
||||||
tmpBuf.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void getBytes(AbstractByteBuf buf, long addr, int index, ByteBuf dst, int dstIndex, int length) {
|
|
||||||
//modified to use this classes checkIndex
|
|
||||||
checkIndex(buf, index, length);
|
|
||||||
checkNotNull(dst, "dst");
|
|
||||||
if (isOutOfBounds(dstIndex, length, dst.capacity())) {
|
|
||||||
throw new IndexOutOfBoundsException("dstIndex: " + dstIndex);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dst.hasMemoryAddress()) {
|
|
||||||
PlatformDependent.copyMemory(addr, dst.memoryAddress() + dstIndex, length);
|
|
||||||
} else if (dst.hasArray()) {
|
|
||||||
PlatformDependent.copyMemory(addr, dst.array(), dst.arrayOffset() + dstIndex, length);
|
|
||||||
} else {
|
|
||||||
dst.setBytes(dstIndex, buf, index, length);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void getBytes(AbstractByteBuf buf, long addr, int index, byte[] dst, int dstIndex, int length) {
|
|
||||||
//modified to use this classes checkIndex
|
|
||||||
checkIndex(buf, index, length);
|
|
||||||
checkNotNull(dst, "dst");
|
|
||||||
if (isOutOfBounds(dstIndex, length, dst.length)) {
|
|
||||||
throw new IndexOutOfBoundsException("dstIndex: " + dstIndex);
|
|
||||||
}
|
|
||||||
if (length != 0) {
|
|
||||||
PlatformDependent.copyMemory(addr, dst, dstIndex, length);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void getBytes(AbstractByteBuf buf, long addr, int index, ByteBuffer dst) {
|
|
||||||
//modified to use this classes checkIndex
|
|
||||||
checkIndex(buf, index, dst.remaining());
|
|
||||||
if (dst.remaining() == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dst.isDirect()) {
|
|
||||||
if (dst.isReadOnly()) {
|
|
||||||
// We need to check if dst is ready-only so we not write something in it by using Unsafe.
|
|
||||||
throw new ReadOnlyBufferException();
|
|
||||||
}
|
|
||||||
// Copy to direct memory
|
|
||||||
long dstAddress = PlatformDependent.directBufferAddress(dst);
|
|
||||||
PlatformDependent.copyMemory(addr, dstAddress + dst.position(), dst.remaining());
|
|
||||||
dst.position(dst.position() + dst.remaining());
|
|
||||||
} else if (dst.hasArray()) {
|
|
||||||
// Copy to array
|
|
||||||
PlatformDependent.copyMemory(addr, dst.array(), dst.arrayOffset() + dst.position(), dst.remaining());
|
|
||||||
dst.position(dst.position() + dst.remaining());
|
|
||||||
} else {
|
|
||||||
dst.put(buf.nioBuffer());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setBytes(AbstractByteBuf buf, long addr, int index, ByteBuf src, int srcIndex, int length) {
|
|
||||||
//modified to use this classes checkIndex
|
|
||||||
checkIndex(buf, index, length);
|
|
||||||
checkNotNull(src, "src");
|
|
||||||
if (isOutOfBounds(srcIndex, length, src.capacity())) {
|
|
||||||
throw new IndexOutOfBoundsException("srcIndex: " + srcIndex);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (length != 0) {
|
|
||||||
if (src.hasMemoryAddress()) {
|
|
||||||
PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, addr, length);
|
|
||||||
} else if (src.hasArray()) {
|
|
||||||
PlatformDependent.copyMemory(src.array(), src.arrayOffset() + srcIndex, addr, length);
|
|
||||||
} else {
|
|
||||||
src.getBytes(srcIndex, buf, index, length);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setBytes(AbstractByteBuf buf, long addr, int index, byte[] src, int srcIndex, int length) {
|
|
||||||
//modified to use this classes checkIndex
|
|
||||||
checkIndex(buf, index, length);
|
|
||||||
if (length != 0) {
|
|
||||||
PlatformDependent.copyMemory(src, srcIndex, addr, length);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setBytes(AbstractByteBuf buf, long addr, int index, ByteBuffer src) {
|
|
||||||
//modified to use this classes checkIndex
|
|
||||||
checkIndex(buf, index, src.remaining());
|
|
||||||
|
|
||||||
int length = src.remaining();
|
|
||||||
if (length == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (src.isDirect()) {
|
|
||||||
// Copy from direct memory
|
|
||||||
long srcAddress = PlatformDependent.directBufferAddress(src);
|
|
||||||
PlatformDependent.copyMemory(srcAddress + src.position(), addr, src.remaining());
|
|
||||||
src.position(src.position() + length);
|
|
||||||
} else if (src.hasArray()) {
|
|
||||||
// Copy from array
|
|
||||||
PlatformDependent.copyMemory(src.array(), src.arrayOffset() + src.position(), addr, length);
|
|
||||||
src.position(src.position() + length);
|
|
||||||
} else {
|
|
||||||
ByteBuf tmpBuf = buf.alloc().heapBuffer(length);
|
|
||||||
try {
|
|
||||||
byte[] tmp = tmpBuf.array();
|
|
||||||
src.get(tmp, tmpBuf.arrayOffset(), length); // moves the src position too
|
|
||||||
PlatformDependent.copyMemory(tmp, tmpBuf.arrayOffset(), addr, length);
|
|
||||||
} finally {
|
|
||||||
tmpBuf.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void getBytes(AbstractByteBuf buf, long addr, int index, OutputStream out, int length) throws IOException {
|
|
||||||
//modified to use this classes checkIndex
|
|
||||||
checkIndex(buf, index, length);
|
|
||||||
if (length != 0) {
|
|
||||||
ByteBuf tmpBuf = buf.alloc().heapBuffer(length);
|
|
||||||
try {
|
|
||||||
byte[] tmp = tmpBuf.array();
|
|
||||||
int offset = tmpBuf.arrayOffset();
|
|
||||||
PlatformDependent.copyMemory(addr, tmp, offset, length);
|
|
||||||
out.write(tmp, offset, length);
|
|
||||||
} finally {
|
|
||||||
tmpBuf.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setZero(long addr, int length) {
|
|
||||||
if (length == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
PlatformDependent.setMemory(addr, length, ZERO);
|
|
||||||
}
|
|
||||||
|
|
||||||
//removed newUnsafeDirectByteBuf method
|
|
||||||
|
|
||||||
//add a modified copy of checkIndex and ensureAccessible methods from ByteBuf (because we don't have package private access)
|
|
||||||
|
|
||||||
private static void checkIndex(ByteBuf byteBuf, int index) {
|
|
||||||
checkIndex(byteBuf, index, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void checkIndex(ByteBuf byteBuf, int index, int fieldLength) {
|
|
||||||
ensureAccessible(byteBuf);
|
|
||||||
checkIndex0(byteBuf, index, fieldLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void ensureAccessible(ByteBuf byteBuf) {
|
|
||||||
if (SystemPropertyUtil.getBoolean("io.netty.buffer.bytebuf.checkAccessible", true) && byteBuf.refCnt() == 0) {
|
|
||||||
throw new IllegalReferenceCountException(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void checkIndex0(ByteBuf byteBuf, int index, int fieldLength) {
|
|
||||||
if (isOutOfBounds(index, fieldLength, byteBuf.capacity())) {
|
|
||||||
throw new IndexOutOfBoundsException(String.format("index: %d, length: %d (expected: range(0, %d))", index, fieldLength, byteBuf.capacity()));
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -24,9 +24,9 @@ import java.nio.channels.FileChannel;
|
||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||||
import org.apache.activemq.artemis.core.io.buffer.UnpooledUnsafeDirectByteBufWrapper;
|
|
||||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||||
import org.apache.activemq.artemis.utils.Env;
|
import org.apache.activemq.artemis.utils.Env;
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ final class MappedFile implements AutoCloseable {
|
||||||
private final MappedByteBuffer buffer;
|
private final MappedByteBuffer buffer;
|
||||||
private final FileChannel channel;
|
private final FileChannel channel;
|
||||||
private final long address;
|
private final long address;
|
||||||
private final UnpooledUnsafeDirectByteBufWrapper byteBufWrapper;
|
private final ByteBuf byteBufWrapper;
|
||||||
private final ChannelBufferWrapper channelBufferWrapper;
|
private final ChannelBufferWrapper channelBufferWrapper;
|
||||||
private int position;
|
private int position;
|
||||||
private int length;
|
private int length;
|
||||||
|
@ -46,7 +46,7 @@ final class MappedFile implements AutoCloseable {
|
||||||
this.buffer = byteBuffer;
|
this.buffer = byteBuffer;
|
||||||
this.position = position;
|
this.position = position;
|
||||||
this.length = length;
|
this.length = length;
|
||||||
this.byteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper();
|
this.byteBufWrapper = Unpooled.wrappedBuffer(buffer);
|
||||||
this.channelBufferWrapper = new ChannelBufferWrapper(this.byteBufWrapper, false);
|
this.channelBufferWrapper = new ChannelBufferWrapper(this.byteBufWrapper, false);
|
||||||
this.address = PlatformDependent.directBufferAddress(buffer);
|
this.address = PlatformDependent.directBufferAddress(buffer);
|
||||||
}
|
}
|
||||||
|
@ -175,13 +175,10 @@ final class MappedFile implements AutoCloseable {
|
||||||
*/
|
*/
|
||||||
public void write(EncodingSupport encodingSupport) throws IOException {
|
public void write(EncodingSupport encodingSupport) throws IOException {
|
||||||
final int encodedSize = encodingSupport.getEncodeSize();
|
final int encodedSize = encodingSupport.getEncodeSize();
|
||||||
this.byteBufWrapper.wrap(this.buffer, this.position, encodedSize);
|
this.byteBufWrapper.setIndex(this.position, this.position);
|
||||||
try {
|
|
||||||
encodingSupport.encode(this.channelBufferWrapper);
|
encodingSupport.encode(this.channelBufferWrapper);
|
||||||
} finally {
|
|
||||||
this.byteBufWrapper.reset();
|
|
||||||
}
|
|
||||||
position += encodedSize;
|
position += encodedSize;
|
||||||
|
assert (byteBufWrapper.writerIndex() == position);
|
||||||
if (position > this.length) {
|
if (position > this.length) {
|
||||||
this.length = position;
|
this.length = position;
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||||
|
@ -34,7 +35,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.io.buffer.UnpooledUnsafeDirectByteBufWrapper;
|
|
||||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||||
|
@ -89,9 +89,6 @@ public final class Page implements Comparable<Page> {
|
||||||
|
|
||||||
private boolean canBeMapped;
|
private boolean canBeMapped;
|
||||||
|
|
||||||
private final ActiveMQBuffer activeMQBuffer;
|
|
||||||
private final UnpooledUnsafeDirectByteBufWrapper unsafeByteBufWrapper;
|
|
||||||
|
|
||||||
public Page(final SimpleString storeName,
|
public Page(final SimpleString storeName,
|
||||||
final StorageManager storageManager,
|
final StorageManager storageManager,
|
||||||
final SequentialFileFactory factory,
|
final SequentialFileFactory factory,
|
||||||
|
@ -104,8 +101,6 @@ public final class Page implements Comparable<Page> {
|
||||||
this.storeName = storeName;
|
this.storeName = storeName;
|
||||||
this.canBeMapped = fileFactory instanceof NIOSequentialFileFactory || fileFactory instanceof MappedSequentialFileFactory;
|
this.canBeMapped = fileFactory instanceof NIOSequentialFileFactory || fileFactory instanceof MappedSequentialFileFactory;
|
||||||
//pooled buffers to avoid allocations on hot paths
|
//pooled buffers to avoid allocations on hot paths
|
||||||
this.unsafeByteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper();
|
|
||||||
this.activeMQBuffer = new ChannelBufferWrapper(this.unsafeByteBufWrapper);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getPageId() {
|
public int getPageId() {
|
||||||
|
@ -152,22 +147,20 @@ public final class Page implements Comparable<Page> {
|
||||||
file.read(buffer);
|
file.read(buffer);
|
||||||
buffer.rewind();
|
buffer.rewind();
|
||||||
assert (buffer.limit() == fileSize) : "buffer doesn't contains the whole file";
|
assert (buffer.limit() == fileSize) : "buffer doesn't contains the whole file";
|
||||||
this.unsafeByteBufWrapper.wrap(buffer, 0, fileSize);
|
ChannelBufferWrapper activeMQBuffer = wrapBuffer(fileSize, buffer);
|
||||||
try {
|
read(storage, activeMQBuffer, messages);
|
||||||
this.activeMQBuffer.clear();
|
|
||||||
this.activeMQBuffer.writerIndex(fileSize);
|
|
||||||
read(storage, this.activeMQBuffer, messages);
|
|
||||||
} finally {
|
|
||||||
this.unsafeByteBufWrapper.reset();
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
this.fileFactory.releaseBuffer(buffer);
|
this.fileFactory.releaseBuffer(buffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ChannelBufferWrapper wrapBuffer(int fileSize, ByteBuffer buffer) {
|
||||||
|
ChannelBufferWrapper activeMQBuffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
|
||||||
|
return activeMQBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
private static MappedByteBuffer mapFileForRead(File file, int fileSize) {
|
private static MappedByteBuffer mapFileForRead(File file, int fileSize) {
|
||||||
try (RandomAccessFile raf = new RandomAccessFile(file, "rw");
|
try (RandomAccessFile raf = new RandomAccessFile(file, "rw"); FileChannel channel = raf.getChannel()) {
|
||||||
FileChannel channel = raf.getChannel()) {
|
|
||||||
return channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
|
return channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IllegalStateException(e);
|
throw new IllegalStateException(e);
|
||||||
|
@ -179,13 +172,10 @@ public final class Page implements Comparable<Page> {
|
||||||
//use a readonly mapped view of the file
|
//use a readonly mapped view of the file
|
||||||
final int mappedSize = size.get();
|
final int mappedSize = size.get();
|
||||||
final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), mappedSize);
|
final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), mappedSize);
|
||||||
this.unsafeByteBufWrapper.wrap(mappedByteBuffer, 0, mappedSize);
|
ChannelBufferWrapper activeMQBuffer = wrapBuffer(mappedSize, mappedByteBuffer);
|
||||||
try {
|
try {
|
||||||
this.activeMQBuffer.clear();
|
return read(storage, activeMQBuffer, messages);
|
||||||
this.activeMQBuffer.writerIndex(mappedSize);
|
|
||||||
return read(storage, this.activeMQBuffer, messages);
|
|
||||||
} finally {
|
} finally {
|
||||||
this.unsafeByteBufWrapper.reset();
|
|
||||||
//unmap the file after read it to avoid GC to take care of it
|
//unmap the file after read it to avoid GC to take care of it
|
||||||
PlatformDependent.freeDirectBuffer(mappedByteBuffer);
|
PlatformDependent.freeDirectBuffer(mappedByteBuffer);
|
||||||
}
|
}
|
||||||
|
@ -238,14 +228,13 @@ public final class Page implements Comparable<Page> {
|
||||||
final int messageEncodedSize = message.getEncodeSize();
|
final int messageEncodedSize = message.getEncodeSize();
|
||||||
final int bufferSize = messageEncodedSize + Page.SIZE_RECORD;
|
final int bufferSize = messageEncodedSize + Page.SIZE_RECORD;
|
||||||
final ByteBuffer buffer = fileFactory.newBuffer(bufferSize);
|
final ByteBuffer buffer = fileFactory.newBuffer(bufferSize);
|
||||||
this.unsafeByteBufWrapper.wrap(buffer, 0, bufferSize);
|
ChannelBufferWrapper activeMQBuffer = wrapBuffer(bufferSize, buffer);
|
||||||
try {
|
activeMQBuffer.clear();
|
||||||
this.activeMQBuffer.clear();
|
activeMQBuffer.writeByte(Page.START_BYTE);
|
||||||
this.activeMQBuffer.writeByte(Page.START_BYTE);
|
activeMQBuffer.writeInt(messageEncodedSize);
|
||||||
this.activeMQBuffer.writeInt(messageEncodedSize);
|
message.encode(activeMQBuffer);
|
||||||
message.encode(this.activeMQBuffer);
|
activeMQBuffer.writeByte(Page.END_BYTE);
|
||||||
this.activeMQBuffer.writeByte(Page.END_BYTE);
|
assert (activeMQBuffer.readableBytes() == bufferSize) : "messageEncodedSize is different from expected";
|
||||||
assert (this.activeMQBuffer.readableBytes() == bufferSize) : "messageEncodedSize is different from expected";
|
|
||||||
//buffer limit and position are the same
|
//buffer limit and position are the same
|
||||||
assert (buffer.remaining() == bufferSize) : "buffer position or limit are changed";
|
assert (buffer.remaining() == bufferSize) : "buffer position or limit are changed";
|
||||||
file.writeDirect(buffer, false);
|
file.writeDirect(buffer, false);
|
||||||
|
@ -256,9 +245,6 @@ public final class Page implements Comparable<Page> {
|
||||||
numberOfMessages.lazySet(numberOfMessages.get() + 1);
|
numberOfMessages.lazySet(numberOfMessages.get() + 1);
|
||||||
size.lazySet(size.get() + bufferSize);
|
size.lazySet(size.get() + bufferSize);
|
||||||
storageManager.pageWrite(message, pageId);
|
storageManager.pageWrite(message, pageId);
|
||||||
} finally {
|
|
||||||
this.unsafeByteBufWrapper.reset();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sync() throws Exception {
|
public void sync() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue