ARTEMIS-1359 Skip re-encode Headers on messages if not needed
Only reencode the Header on a Message when the redelivering the Message to avoid overhead and unneeded modification to the original encoding of the Header.
This commit is contained in:
parent
47485ce809
commit
cb9482d9fa
|
@ -75,8 +75,13 @@ public class AMQPMessage extends RefCountMessage {
|
|||
MessageImpl protonMessage;
|
||||
private volatile int memoryEstimate = -1;
|
||||
private long expiration = 0;
|
||||
// this is to store where to start sending bytes, ignoring header and delivery annotations.
|
||||
private int sendFrom = 0;
|
||||
|
||||
// Records where the Header section ends if present.
|
||||
private int headerEnds = 0;
|
||||
|
||||
// Records where the message payload starts, ignoring DeliveryAnnotations if present
|
||||
private int messagePaylodStart = 0;
|
||||
|
||||
private boolean parsedHeaders = false;
|
||||
private Header _header;
|
||||
private DeliveryAnnotations _deliveryAnnotations;
|
||||
|
@ -130,14 +135,15 @@ public class AMQPMessage extends RefCountMessage {
|
|||
private void initalizeObjects() {
|
||||
if (protonMessage == null) {
|
||||
if (data == null) {
|
||||
this.sendFrom = 0;
|
||||
headerEnds = 0;
|
||||
messagePaylodStart = 0;
|
||||
_header = new Header();
|
||||
_deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
|
||||
_properties = new Properties();
|
||||
this.applicationProperties = new ApplicationProperties(new HashMap<>());
|
||||
this.protonMessage = (MessageImpl) Message.Factory.create();
|
||||
this.protonMessage.setApplicationProperties(applicationProperties);
|
||||
this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
|
||||
applicationProperties = new ApplicationProperties(new HashMap<>());
|
||||
protonMessage = (MessageImpl) Message.Factory.create();
|
||||
protonMessage.setApplicationProperties(applicationProperties);
|
||||
protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -364,8 +370,9 @@ public class AMQPMessage extends RefCountMessage {
|
|||
}
|
||||
|
||||
if (section instanceof Header) {
|
||||
sendFrom = buffer.position();
|
||||
_header = (Header) section;
|
||||
headerEnds = buffer.position();
|
||||
messagePaylodStart = headerEnds;
|
||||
|
||||
if (_header.getTtl() != null) {
|
||||
this.expiration = System.currentTimeMillis() + _header.getTtl().intValue();
|
||||
|
@ -376,13 +383,17 @@ public class AMQPMessage extends RefCountMessage {
|
|||
} else {
|
||||
section = null;
|
||||
}
|
||||
|
||||
} else {
|
||||
// meaning there is no header
|
||||
sendFrom = 0;
|
||||
headerEnds = 0;
|
||||
}
|
||||
if (section instanceof DeliveryAnnotations) {
|
||||
_deliveryAnnotations = (DeliveryAnnotations) section;
|
||||
sendFrom = buffer.position();
|
||||
|
||||
// Advance the start beyond the delivery annotations so they are not written
|
||||
// out on send of the message.
|
||||
messagePaylodStart = buffer.position();
|
||||
|
||||
if (buffer.hasRemaining()) {
|
||||
section = (Section) decoder.readObject();
|
||||
|
@ -463,10 +474,14 @@ public class AMQPMessage extends RefCountMessage {
|
|||
checkBuffer();
|
||||
|
||||
byte[] origin = data.array();
|
||||
byte[] newData = new byte[data.array().length - sendFrom];
|
||||
for (int i = 0; i < newData.length; i++) {
|
||||
newData[i] = origin[i + sendFrom];
|
||||
}
|
||||
byte[] newData = new byte[data.array().length - (messagePaylodStart - headerEnds)];
|
||||
|
||||
// Copy the original header
|
||||
System.arraycopy(origin, 0, newData, 0, headerEnds);
|
||||
|
||||
// Copy the body following the delivery annotations if present
|
||||
System.arraycopy(origin, messagePaylodStart, newData, headerEnds, data.array().length - messagePaylodStart);
|
||||
|
||||
AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
|
||||
newEncode.setDurable(isDurable());
|
||||
return newEncode;
|
||||
|
@ -550,7 +565,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
}
|
||||
|
||||
if (getHeader() != null && getHeader().getDurable() != null) {
|
||||
durable = getHeader().getDurable().booleanValue();
|
||||
durable = getHeader().getDurable();
|
||||
return durable;
|
||||
} else {
|
||||
return durable != null ? durable : false;
|
||||
|
@ -652,26 +667,35 @@ public class AMQPMessage extends RefCountMessage {
|
|||
public int getEncodeSize() {
|
||||
checkBuffer();
|
||||
// + 20checkBuffer is an estimate for the Header with the deliveryCount
|
||||
return data.array().length - sendFrom + 20;
|
||||
return data.array().length - messagePaylodStart + 20;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendBuffer(ByteBuf buffer, int deliveryCount) {
|
||||
checkBuffer();
|
||||
|
||||
int amqpDeliveryCount = deliveryCount - 1;
|
||||
|
||||
Header header = getHeader();
|
||||
if (header == null && deliveryCount > 0) {
|
||||
if (header == null && (amqpDeliveryCount > 0)) {
|
||||
header = new Header();
|
||||
header.setDurable(durable);
|
||||
}
|
||||
if (header != null) {
|
||||
|
||||
// If the re-delivering the message then the header must be re-encoded
|
||||
// otherwise we want to write the original header if present.
|
||||
if (amqpDeliveryCount > 0) {
|
||||
synchronized (header) {
|
||||
header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1));
|
||||
header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
|
||||
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
|
||||
TLSEncode.getEncoder().writeObject(header);
|
||||
TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
|
||||
}
|
||||
} else if (headerEnds > 0) {
|
||||
buffer.writeBytes(data, 0, headerEnds);
|
||||
}
|
||||
buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom);
|
||||
|
||||
buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1045,7 +1069,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
int size = record.readInt();
|
||||
byte[] recordArray = new byte[size];
|
||||
record.readBytes(recordArray);
|
||||
this.sendFrom = 0; // whatever was persisted will be sent
|
||||
this.messagePaylodStart = 0; // whatever was persisted will be sent
|
||||
this.data = Unpooled.wrappedBuffer(recordArray);
|
||||
this.bufferValid = true;
|
||||
this.durable = true; // it's coming from the journal, so it's durable
|
||||
|
|
Loading…
Reference in New Issue