diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java index c129e3a770..f30ef3509e 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java @@ -1149,4 +1149,10 @@ public interface ActiveMQBuffer extends DataInput { * @return A converted NIO Buffer */ ByteBuffer toByteBuffer(int index, int length); + + /** + * Release any underlying resources held by this buffer + */ + void release(); + } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java index 22f0f9a9a4..849c9214bf 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core; import java.nio.ByteBuffer; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; @@ -26,6 +27,9 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; */ public final class ActiveMQBuffers { + + private static final PooledByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(); + /** * Creates a self-expanding ActiveMQBuffer with the given initial size * @@ -36,6 +40,11 @@ public final class ActiveMQBuffers { return new ChannelBufferWrapper(Unpooled.buffer(size)); } + public static ActiveMQBuffer pooledBuffer(final int size) { + return new ChannelBufferWrapper(ALLOCATOR.heapBuffer(size),true, true); + } + + /** * Creates a self-expanding ActiveMQBuffer filled with the given byte array * diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java index 60262f809b..c75be21c63 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java @@ -31,7 +31,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { protected ByteBuf buffer; // NO_UCD (use final) private final boolean releasable; - + private final boolean isPooled; public static ByteBuf unwrap(ByteBuf buffer) { ByteBuf parent; while ((parent = buffer.unwrap()) != null && parent != buffer) { // this last part is just in case the semantic @@ -45,14 +45,18 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { public ChannelBufferWrapper(final ByteBuf buffer) { this(buffer, false); } - public ChannelBufferWrapper(final ByteBuf buffer, boolean releasable) { + this(buffer, releasable, false); + } + public ChannelBufferWrapper(final ByteBuf buffer, boolean releasable, boolean pooled) { if (!releasable) { this.buffer = Unpooled.unreleasableBuffer(buffer); } else { this.buffer = buffer; } this.releasable = releasable; + this.isPooled = pooled; + } @Override @@ -398,7 +402,19 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { @Override public ActiveMQBuffer readSlice(final int length) { - return new ChannelBufferWrapper(buffer.readSlice(length), releasable); + if ( isPooled ) { + ByteBuf fromBuffer = buffer.readSlice(length); + ByteBuf newNettyBuffer = Unpooled.buffer(fromBuffer.capacity()); + int read = fromBuffer.readerIndex(); + int writ = fromBuffer.writerIndex(); + fromBuffer.readerIndex(0); + fromBuffer.readBytes(newNettyBuffer,0,writ); + newNettyBuffer.setIndex(read,writ); + ActiveMQBuffer returnBuffer = new ChannelBufferWrapper(newNettyBuffer,releasable,false); + returnBuffer.setIndex(read,writ); + return returnBuffer; + } + return new ChannelBufferWrapper(buffer.readSlice(length), releasable, isPooled); } @Override @@ -522,6 +538,13 @@ public class ChannelBufferWrapper implements ActiveMQBuffer { return buffer.nioBuffer(index, length); } + @Override + public void release() { + if ( this.isPooled ) { + buffer.release(); + } + } + @Override public boolean writable() { return buffer.isWritable(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java index 24b7f1e96d..55f912903d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java @@ -202,6 +202,11 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); } + @Override + public void release() { + //no-op + } + @Override public int readerIndex() { return 0; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index 5a27f248e3..951aea2f62 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -524,6 +524,11 @@ public class LargeMessageControllerImpl implements LargeMessageController { throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); } + @Override + public void release() { + //no-op + } + @Override public int readerIndex() { return (int) readerIndex; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java index 921f97d5a8..1e1981750b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -76,9 +77,9 @@ public abstract class MessageImpl implements MessageInternal { protected byte priority; - protected ActiveMQBuffer buffer; + protected volatile ActiveMQBuffer buffer; - protected ResetLimitWrappedActiveMQBuffer bodyBuffer; + protected volatile ResetLimitWrappedActiveMQBuffer bodyBuffer; protected volatile boolean bufferValid; @@ -434,12 +435,16 @@ public abstract class MessageImpl implements MessageInternal { @Override public void decodeFromBuffer(final ActiveMQBuffer buffer) { - this.buffer = buffer; + + this.buffer = copyMessageBuffer(buffer); decode(); + //synchronize indexes + buffer.setIndex(this.buffer.readerIndex(),this.buffer.writerIndex()); + // Setting up the BodyBuffer based on endOfBodyPosition set from decode - ResetLimitWrappedActiveMQBuffer tmpbodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, null); + ResetLimitWrappedActiveMQBuffer tmpbodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, this.buffer, null); tmpbodyBuffer.readerIndex(BODY_OFFSET); tmpbodyBuffer.writerIndex(endOfBodyPosition); // only set this after the writer and reader is set, @@ -449,6 +454,30 @@ public abstract class MessageImpl implements MessageInternal { } + private ActiveMQBuffer copyMessageBuffer(ActiveMQBuffer buffer) { + ActiveMQBuffer copiedBuffer; + + ByteBuf newNettyBuffer = Unpooled.buffer( buffer.byteBuf().capacity() ); + + int read = buffer.byteBuf().readerIndex(); + int writ = buffer.byteBuf().writerIndex(); + + int readArt = buffer.readerIndex(); + int writArt = buffer.writerIndex(); + buffer.byteBuf().readerIndex( 0 ); + + buffer.byteBuf().readBytes( newNettyBuffer, 0, buffer.byteBuf().writerIndex() ); + buffer.byteBuf().setIndex( read, writ ); + newNettyBuffer.setIndex( read, writ ); + + copiedBuffer = new ChannelBufferWrapper( newNettyBuffer ); + + buffer.setIndex( readArt, writArt ); + copiedBuffer.setIndex( readArt, writArt ); + + return copiedBuffer; + } + @Override public void bodyChanged() { bufferValid = false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java index ddb734ee80..d7ae5b3496 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java @@ -63,6 +63,15 @@ public interface Packet { */ ActiveMQBuffer encode(RemotingConnection connection); + /** + * Encodes the packet and returns a {@link ActiveMQBuffer} containing the data + * + * @param connection the connection + * @param usePooled if the returned buffer should be pooled or unpooled + * @return the buffer to encode to + */ + ActiveMQBuffer encode(RemotingConnection connection, boolean usePooled); + /** * decodes the buffer into this packet * diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 92f291c1d3..0bd1cff218 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -919,6 +919,8 @@ public class ActiveMQSessionContext extends SessionContext { ActiveMQBuffer buffer = packet.encode(this.getCoreConnection()); conn.write(buffer, false, false); + + buffer.release(); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 5f20f462fb..5d1e37ae3a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -305,6 +305,8 @@ public final class ChannelImpl implements Channel { // buffer is full, preventing any incoming buffers being handled and blocking failover connection.getTransportConnection().write(buffer, flush, batch); + buffer.release(); + return true; } } @@ -412,6 +414,7 @@ public final class ChannelImpl implements Channel { } } finally { lock.unlock(); + buffer.release(); } return response; @@ -634,6 +637,9 @@ public final class ChannelImpl implements Channel { final ActiveMQBuffer buffer = packet.encode(connection); connection.getTransportConnection().write(buffer, false, false); + + buffer.release(); + } private void addResendPacket(Packet packet) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 646eb28602..9025210c7a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -304,7 +304,13 @@ public class PacketImpl implements Packet { @Override public ActiveMQBuffer encode(final RemotingConnection connection) { - ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE); + return encode(connection,true); + } + + + @Override + public ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) { + ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled); // The standard header fields diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java index 02ed2bf383..ce76186531 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java @@ -56,7 +56,7 @@ public class SessionReceiveMessage extends MessagePacket { public ActiveMQBuffer encode(final RemotingConnection connection) { ActiveMQBuffer buffer = message.getEncodedBuffer(); - ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex()); + ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex(), true); bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity()); bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java index 91d43a565b..c7bb30ebb9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java @@ -68,7 +68,7 @@ public class SessionSendMessage extends MessagePacket { // this is for unit tests only bufferWrite = buffer.copy(0, buffer.capacity()); } else { - bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1); // 1 for the requireResponse + bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1, true); // 1 for the requireResponse } bufferWrite.writeBytes(buffer, 0, buffer.writerIndex()); bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 0b307eff5d..33dbf4b8b4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -210,6 +210,11 @@ public class NettyConnection implements Connection { @Override public ActiveMQBuffer createTransportBuffer(final int size) { + return createTransportBuffer(size, false); + } + + @Override + public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) { return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java index a9e12aa6c2..6884243ee4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java @@ -178,6 +178,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection { return transportConnection.createTransportBuffer(size); } + @Override + public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) { + return transportConnection.createTransportBuffer(size, pooled); + } + @Override public Connection getTransportConnection() { return transportConnection; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java index 39ecdf6260..a68999b8be 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java @@ -120,6 +120,8 @@ public interface RemotingConnection extends BufferHandler { */ ActiveMQBuffer createTransportBuffer(int size); + ActiveMQBuffer createTransportBuffer(int size, boolean pooled); + /** * called when the underlying connection fails. * diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index 7ab0c40986..a5fcf875b1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -35,6 +35,8 @@ public interface Connection { */ ActiveMQBuffer createTransportBuffer(int size); + ActiveMQBuffer createTransportBuffer(int size, boolean pooled); + RemotingConnection getProtocolConnection(); void setProtocolConnection(RemotingConnection connection); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java index 6beee36414..08c46be797 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java @@ -765,5 +765,10 @@ public class TestConversions extends Assert { public ByteBuffer toByteBuffer(int index, int length) { return null; } + + @Override + public void release() { + //no-op + } } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index 446e362016..6143cf7409 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -132,6 +132,11 @@ public class MQTTConnection implements RemotingConnection { @Override public ActiveMQBuffer createTransportBuffer(int size) { + return createTransportBuffer(size, false); + } + + @Override + public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) { return transportConnection.createTransportBuffer(size); } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 5dafe6075c..0eb81b9d5c 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -293,6 +293,11 @@ public final class StompConnection implements RemotingConnection { @Override public ActiveMQBuffer createTransportBuffer(int size) { + return createTransportBuffer(size, false); + } + + @Override + public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) { return ActiveMQBuffers.dynamicBuffer(size); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java index f80834918d..78ebcb927e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java @@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler; import org.apache.activemq.artemis.core.server.cluster.qourum.Vote; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class QuorumVoteMessage extends PacketImpl { @@ -40,6 +41,11 @@ public class QuorumVoteMessage extends PacketImpl { this.vote = vote; } + @Override + public ActiveMQBuffer encode(final RemotingConnection connection) { + return encode(connection,false); + } + @Override public void encodeRest(ActiveMQBuffer buffer) { super.encodeRest(buffer); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java index 27fc5440ed..24931d3cc3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java @@ -146,7 +146,16 @@ public class InVMConnection implements Connection { @Override public ActiveMQBuffer createTransportBuffer(final int size) { - return ActiveMQBuffers.dynamicBuffer(size); + return createTransportBuffer(size, false); + } + + @Override + public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) { + if ( pooled ) { + return ActiveMQBuffers.pooledBuffer( size ); + } else { + return ActiveMQBuffers.dynamicBuffer( size ); + } } @Override @@ -173,9 +182,13 @@ public class InVMConnection implements Connection { final boolean flush, final boolean batch, final ChannelFutureListener futureListener) { - final ActiveMQBuffer copied = buffer.copy(0, buffer.capacity()); - copied.setIndex(buffer.readerIndex(), buffer.writerIndex()); + final ActiveMQBuffer copied = ActiveMQBuffers.pooledBuffer(buffer.capacity()); + int read = buffer.readerIndex(); + int writ = buffer.writerIndex(); + copied.writeBytes(buffer,read,writ - read); + copied.setIndex(read,writ); + buffer.setIndex(read,writ); try { executor.execute(new Runnable() { @@ -201,6 +214,10 @@ public class InVMConnection implements Connection { if (logger.isTraceEnabled()) { logger.trace(InVMConnection.this + "::packet sent done"); } + copied.release(); +// if ( copied.byteBuf().refCnt() > 0 ) { +// copied.release(); +// } } } }); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java index c559bf9a61..252b0eb57c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java @@ -286,13 +286,18 @@ public class MessageImplTest extends ActiveMQTestBase { } for (int i = 0; i < RUNS; i++) { + ActiveMQBuffer buf = null; try { SessionSendMessage ssm = new SessionSendMessage(msg); - ActiveMQBuffer buf = ssm.encode(null); + buf = ssm.encode(null); simulateRead(buf); } catch (Throwable e) { e.printStackTrace(); errors.incrementAndGet(); + } finally { + if ( buf != null ) { + buf.release(); + } } } }