This closes #337

This commit is contained in:
Clebert Suconic 2016-01-23 01:01:41 -05:00
commit 4a33d2d489
3 changed files with 37 additions and 49 deletions

View File

@ -86,8 +86,6 @@ public abstract class MessageImpl implements MessageInternal {
private boolean copied = true;
private boolean bufferUsed;
private UUID userID;
// Constructors --------------------------------------------------
@ -157,8 +155,6 @@ public abstract class MessageImpl implements MessageInternal {
copied = other.copied;
if (other.buffer != null) {
other.bufferUsed = true;
// We need to copy the underlying buffer too, since the different messsages thereafter might have different
// properties set on them, making their encoding different
buffer = other.buffer.copy(0, other.buffer.writerIndex());
@ -507,21 +503,7 @@ public abstract class MessageImpl implements MessageInternal {
@Override
public synchronized ActiveMQBuffer getEncodedBuffer() {
ActiveMQBuffer buff = encodeToBuffer();
if (bufferUsed) {
ActiveMQBuffer copied = buff.copy(0, buff.capacity());
copied.setIndex(0, endOfMessagePosition);
return copied;
}
else {
buffer.setIndex(0, endOfMessagePosition);
bufferUsed = true;
return buffer;
}
return buff.duplicate();
}
@Override
@ -935,9 +917,12 @@ public abstract class MessageImpl implements MessageInternal {
buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()];
bodyBuffer.readBytes(buffer2);
bodyBuffer.readerIndex(readerIndex2);
return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1);
}
else {
return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1);
}
return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[" + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1);
}
@Override
@ -962,18 +947,8 @@ public abstract class MessageImpl implements MessageInternal {
// many queues - the first caller in this case will actually encode it
private synchronized ActiveMQBuffer encodeToBuffer() {
if (!bufferValid) {
if (bufferUsed) {
// Cannot use same buffer - must copy
forceCopy();
}
int bodySize = getEndOfBodyPosition();
// Clebert: I've started sending this on encoding due to conversions between protocols
// and making sure we are not losing the buffer start position between protocols
this.endOfBodyPosition = bodySize;
// write it
buffer.setInt(BUFFER_HEADER_SPACE, bodySize);
@ -1032,8 +1007,6 @@ public abstract class MessageImpl implements MessageInternal {
if (bodyBuffer != null) {
bodyBuffer.setBuffer(buffer);
}
bufferUsed = false;
}
// Inner classes -------------------------------------------------

View File

@ -56,27 +56,31 @@ public class SessionReceiveMessage extends MessagePacket {
public ActiveMQBuffer encode(final RemotingConnection connection) {
ActiveMQBuffer buffer = message.getEncodedBuffer();
ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex());
bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity());
bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
// Sanity check
if (buffer.writerIndex() != message.getEndOfMessagePosition()) {
if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
throw new IllegalStateException("Wrong encode position");
}
buffer.writeLong(consumerID);
buffer.writeInt(deliveryCount);
bufferWrite.writeLong(consumerID);
bufferWrite.writeInt(deliveryCount);
size = buffer.writerIndex();
size = bufferWrite.writerIndex();
// Write standard headers
int len = size - DataConstants.SIZE_INT;
buffer.setInt(0, len);
buffer.setByte(DataConstants.SIZE_INT, getType());
buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
bufferWrite.setInt(0, len);
bufferWrite.setByte(DataConstants.SIZE_INT, getType());
bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
// Position reader for reading by Netty
buffer.setIndex(0, size);
bufferWrite.setIndex(0, size);
return buffer;
return bufferWrite;
}
@Override

View File

@ -63,28 +63,39 @@ public class SessionSendMessage extends MessagePacket {
public ActiveMQBuffer encode(final RemotingConnection connection) {
ActiveMQBuffer buffer = message.getEncodedBuffer();
ActiveMQBuffer bufferWrite;
if (connection == null) {
// this is for unit tests only
bufferWrite = buffer.copy(0, buffer.capacity());
}
else {
bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1); // 1 for the requireResponse
}
bufferWrite.writeBytes(buffer, 0, buffer.writerIndex());
bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
// Sanity check
if (buffer.writerIndex() != message.getEndOfMessagePosition()) {
if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
throw new IllegalStateException("Wrong encode position");
}
buffer.writeBoolean(requiresResponse);
bufferWrite.writeBoolean(requiresResponse);
size = buffer.writerIndex();
size = bufferWrite.writerIndex();
// Write standard headers
int len = size - DataConstants.SIZE_INT;
buffer.setInt(0, len);
buffer.setByte(DataConstants.SIZE_INT, getType());
buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
bufferWrite.setInt(0, len);
bufferWrite.setByte(DataConstants.SIZE_INT, getType());
bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
// Position reader for reading by Netty
buffer.readerIndex(0);
bufferWrite.readerIndex(0);
message.resetCopied();
return buffer;
return bufferWrite;
}
@Override