ARTEMIS-3019 Before applying the fix into two methods, I needed to move the method into the right place

There is no real change in this commit other than moving the methods to the right place
This commit is contained in:
Clebert Suconic 2020-12-04 13:44:27 -05:00
parent 12cd10a43c
commit ccefbfcfa9
2 changed files with 69 additions and 64 deletions

View File

@ -35,12 +35,18 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMess
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.TypeConstructor;
import org.apache.qpid.proton.codec.WritableBuffer;
public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage {
@ -195,6 +201,69 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
}
private void saveEncoding(ByteBuf buf) {
WritableBuffer oldBuffer = TLSEncode.getEncoder().getBuffer();
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
try {
buf.writeInt(headerPosition);
buf.writeInt(encodedHeaderSize);
TLSEncode.getEncoder().writeObject(header);
buf.writeInt(deliveryAnnotationsPosition);
buf.writeInt(encodedDeliveryAnnotationsSize);
buf.writeInt(messageAnnotationsPosition);
TLSEncode.getEncoder().writeObject(messageAnnotations);
buf.writeInt(propertiesPosition);
TLSEncode.getEncoder().writeObject(properties);
buf.writeInt(applicationPropertiesPosition);
buf.writeInt(remainingBodyPosition);
TLSEncode.getEncoder().writeObject(applicationProperties);
} finally {
TLSEncode.getEncoder().setByteBuffer(oldBuffer);
}
}
protected void readSavedEncoding(ByteBuf buf) {
ReadableBuffer oldBuffer = TLSEncode.getDecoder().getBuffer();
TLSEncode.getDecoder().setBuffer(new NettyReadable(buf));
try {
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
headerPosition = buf.readInt();
encodedHeaderSize = buf.readInt();
header = (Header)TLSEncode.getDecoder().readObject();
deliveryAnnotationsPosition = buf.readInt();
encodedDeliveryAnnotationsSize = buf.readInt();
messageAnnotationsPosition = buf.readInt();
messageAnnotations = (MessageAnnotations)TLSEncode.getDecoder().readObject();
propertiesPosition = buf.readInt();
properties = (Properties)TLSEncode.getDecoder().readObject();
applicationPropertiesPosition = buf.readInt();
remainingBodyPosition = buf.readInt();
applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject();
} finally {
TLSEncode.getDecoder().setBuffer(oldBuffer);
}
}
@Override
public void finishParse() throws Exception {
openLargeMessage();

View File

@ -565,70 +565,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
return remainingBodyPosition > 0 ? remainingBodyPosition : 1024;
}
protected void saveEncoding(ByteBuf buf) {
WritableBuffer oldBuffer = TLSEncode.getEncoder().getBuffer();
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
try {
buf.writeInt(headerPosition);
buf.writeInt(encodedHeaderSize);
TLSEncode.getEncoder().writeObject(header);
buf.writeInt(deliveryAnnotationsPosition);
buf.writeInt(encodedDeliveryAnnotationsSize);
buf.writeInt(messageAnnotationsPosition);
TLSEncode.getEncoder().writeObject(messageAnnotations);
buf.writeInt(propertiesPosition);
TLSEncode.getEncoder().writeObject(properties);
buf.writeInt(applicationPropertiesPosition);
buf.writeInt(remainingBodyPosition);
TLSEncode.getEncoder().writeObject(applicationProperties);
} finally {
TLSEncode.getEncoder().setByteBuffer(oldBuffer);
}
}
protected void readSavedEncoding(ByteBuf buf) {
ReadableBuffer oldBuffer = TLSEncode.getDecoder().getBuffer();
TLSEncode.getDecoder().setBuffer(new NettyReadable(buf));
try {
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
headerPosition = buf.readInt();
encodedHeaderSize = buf.readInt();
header = (Header)TLSEncode.getDecoder().readObject();
deliveryAnnotationsPosition = buf.readInt();
encodedDeliveryAnnotationsSize = buf.readInt();
messageAnnotationsPosition = buf.readInt();
messageAnnotations = (MessageAnnotations)TLSEncode.getDecoder().readObject();
propertiesPosition = buf.readInt();
properties = (Properties)TLSEncode.getDecoder().readObject();
applicationPropertiesPosition = buf.readInt();
remainingBodyPosition = buf.readInt();
applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject();
} finally {
TLSEncode.getDecoder().setBuffer(oldBuffer);
}
}
protected synchronized void resetMessageData() {
header = null;
messageAnnotations = null;