diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java index 783cf00d00..0c66aeff20 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java @@ -86,8 +86,6 @@ public abstract class MessageImpl implements MessageInternal { private boolean copied = true; - private boolean bufferUsed; - private UUID userID; // Constructors -------------------------------------------------- @@ -157,8 +155,6 @@ public abstract class MessageImpl implements MessageInternal { copied = other.copied; if (other.buffer != null) { - other.bufferUsed = true; - // We need to copy the underlying buffer too, since the different messsages thereafter might have different // properties set on them, making their encoding different buffer = other.buffer.copy(0, other.buffer.writerIndex()); @@ -507,21 +503,7 @@ public abstract class MessageImpl implements MessageInternal { @Override public synchronized ActiveMQBuffer getEncodedBuffer() { ActiveMQBuffer buff = encodeToBuffer(); - - if (bufferUsed) { - ActiveMQBuffer copied = buff.copy(0, buff.capacity()); - - copied.setIndex(0, endOfMessagePosition); - - return copied; - } - else { - buffer.setIndex(0, endOfMessagePosition); - - bufferUsed = true; - - return buffer; - } + return buff.duplicate(); } @Override @@ -935,9 +917,12 @@ public abstract class MessageImpl implements MessageInternal { buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()]; bodyBuffer.readBytes(buffer2); bodyBuffer.readerIndex(readerIndex2); + return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1); + } + else { + return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1); } - return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[" + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1); } @Override @@ -962,18 +947,8 @@ public abstract class MessageImpl implements MessageInternal { // many queues - the first caller in this case will actually encode it private synchronized ActiveMQBuffer encodeToBuffer() { if (!bufferValid) { - if (bufferUsed) { - // Cannot use same buffer - must copy - - forceCopy(); - } - int bodySize = getEndOfBodyPosition(); - // Clebert: I've started sending this on encoding due to conversions between protocols - // and making sure we are not losing the buffer start position between protocols - this.endOfBodyPosition = bodySize; - // write it buffer.setInt(BUFFER_HEADER_SPACE, bodySize); @@ -1032,8 +1007,6 @@ public abstract class MessageImpl implements MessageInternal { if (bodyBuffer != null) { bodyBuffer.setBuffer(buffer); } - - bufferUsed = false; } // Inner classes ------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java index 83fe33c907..bccc9df406 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java @@ -56,27 +56,31 @@ public class SessionReceiveMessage extends MessagePacket { public ActiveMQBuffer encode(final RemotingConnection connection) { ActiveMQBuffer buffer = message.getEncodedBuffer(); + ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex()); + bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity()); + bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); + // Sanity check - if (buffer.writerIndex() != message.getEndOfMessagePosition()) { + if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) { throw new IllegalStateException("Wrong encode position"); } - buffer.writeLong(consumerID); - buffer.writeInt(deliveryCount); + bufferWrite.writeLong(consumerID); + bufferWrite.writeInt(deliveryCount); - size = buffer.writerIndex(); + size = bufferWrite.writerIndex(); // Write standard headers int len = size - DataConstants.SIZE_INT; - buffer.setInt(0, len); - buffer.setByte(DataConstants.SIZE_INT, getType()); - buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID); + bufferWrite.setInt(0, len); + bufferWrite.setByte(DataConstants.SIZE_INT, getType()); + bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID); // Position reader for reading by Netty - buffer.setIndex(0, size); + bufferWrite.setIndex(0, size); - return buffer; + return bufferWrite; } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java index 525f00a9cb..300f8ed0de 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java @@ -63,28 +63,39 @@ public class SessionSendMessage extends MessagePacket { public ActiveMQBuffer encode(final RemotingConnection connection) { ActiveMQBuffer buffer = message.getEncodedBuffer(); + ActiveMQBuffer bufferWrite; + if (connection == null) { + // this is for unit tests only + bufferWrite = buffer.copy(0, buffer.capacity()); + } + else { + bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1); // 1 for the requireResponse + } + bufferWrite.writeBytes(buffer, 0, buffer.writerIndex()); + bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex()); + // Sanity check - if (buffer.writerIndex() != message.getEndOfMessagePosition()) { + if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) { throw new IllegalStateException("Wrong encode position"); } - buffer.writeBoolean(requiresResponse); + bufferWrite.writeBoolean(requiresResponse); - size = buffer.writerIndex(); + size = bufferWrite.writerIndex(); // Write standard headers int len = size - DataConstants.SIZE_INT; - buffer.setInt(0, len); - buffer.setByte(DataConstants.SIZE_INT, getType()); - buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID); + bufferWrite.setInt(0, len); + bufferWrite.setByte(DataConstants.SIZE_INT, getType()); + bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID); // Position reader for reading by Netty - buffer.readerIndex(0); + bufferWrite.readerIndex(0); message.resetCopied(); - return buffer; + return bufferWrite; } @Override