diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index b4752082d7..93b61455d0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -75,8 +75,13 @@ public class AMQPMessage extends RefCountMessage { MessageImpl protonMessage; private volatile int memoryEstimate = -1; private long expiration = 0; - // this is to store where to start sending bytes, ignoring header and delivery annotations. - private int sendFrom = 0; + + // Records where the Header section ends if present. + private int headerEnds = 0; + + // Records where the message payload starts, ignoring DeliveryAnnotations if present + private int messagePaylodStart = 0; + private boolean parsedHeaders = false; private Header _header; private DeliveryAnnotations _deliveryAnnotations; @@ -130,14 +135,15 @@ public class AMQPMessage extends RefCountMessage { private void initalizeObjects() { if (protonMessage == null) { if (data == null) { - this.sendFrom = 0; + headerEnds = 0; + messagePaylodStart = 0; _header = new Header(); _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); _properties = new Properties(); - this.applicationProperties = new ApplicationProperties(new HashMap<>()); - this.protonMessage = (MessageImpl) Message.Factory.create(); - this.protonMessage.setApplicationProperties(applicationProperties); - this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations); + applicationProperties = new ApplicationProperties(new HashMap<>()); + protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setApplicationProperties(applicationProperties); + protonMessage.setDeliveryAnnotations(_deliveryAnnotations); } } } @@ -364,8 +370,9 @@ public class AMQPMessage extends RefCountMessage { } if (section instanceof Header) { - sendFrom = buffer.position(); _header = (Header) section; + headerEnds = buffer.position(); + messagePaylodStart = headerEnds; if (_header.getTtl() != null) { this.expiration = System.currentTimeMillis() + _header.getTtl().intValue(); @@ -376,13 +383,17 @@ public class AMQPMessage extends RefCountMessage { } else { section = null; } + } else { // meaning there is no header - sendFrom = 0; + headerEnds = 0; } if (section instanceof DeliveryAnnotations) { _deliveryAnnotations = (DeliveryAnnotations) section; - sendFrom = buffer.position(); + + // Advance the start beyond the delivery annotations so they are not written + // out on send of the message. + messagePaylodStart = buffer.position(); if (buffer.hasRemaining()) { section = (Section) decoder.readObject(); @@ -463,10 +474,14 @@ public class AMQPMessage extends RefCountMessage { checkBuffer(); byte[] origin = data.array(); - byte[] newData = new byte[data.array().length - sendFrom]; - for (int i = 0; i < newData.length; i++) { - newData[i] = origin[i + sendFrom]; - } + byte[] newData = new byte[data.array().length - (messagePaylodStart - headerEnds)]; + + // Copy the original header + System.arraycopy(origin, 0, newData, 0, headerEnds); + + // Copy the body following the delivery annotations if present + System.arraycopy(origin, messagePaylodStart, newData, headerEnds, data.array().length - messagePaylodStart); + AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData); newEncode.setDurable(isDurable()); return newEncode; @@ -550,7 +565,7 @@ public class AMQPMessage extends RefCountMessage { } if (getHeader() != null && getHeader().getDurable() != null) { - durable = getHeader().getDurable().booleanValue(); + durable = getHeader().getDurable(); return durable; } else { return durable != null ? durable : false; @@ -652,26 +667,35 @@ public class AMQPMessage extends RefCountMessage { public int getEncodeSize() { checkBuffer(); // + 20checkBuffer is an estimate for the Header with the deliveryCount - return data.array().length - sendFrom + 20; + return data.array().length - messagePaylodStart + 20; } @Override public void sendBuffer(ByteBuf buffer, int deliveryCount) { checkBuffer(); + + int amqpDeliveryCount = deliveryCount - 1; + Header header = getHeader(); - if (header == null && deliveryCount > 0) { + if (header == null && (amqpDeliveryCount > 0)) { header = new Header(); header.setDurable(durable); } - if (header != null) { + + // If the re-delivering the message then the header must be re-encoded + // otherwise we want to write the original header if present. + if (amqpDeliveryCount > 0) { synchronized (header) { - header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1)); + header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount)); TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer)); TLSEncode.getEncoder().writeObject(header); TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); } + } else if (headerEnds > 0) { + buffer.writeBytes(data, 0, headerEnds); } - buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom); + + buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart); } @Override @@ -1045,7 +1069,7 @@ public class AMQPMessage extends RefCountMessage { int size = record.readInt(); byte[] recordArray = new byte[size]; record.readBytes(recordArray); - this.sendFrom = 0; // whatever was persisted will be sent + this.messagePaylodStart = 0; // whatever was persisted will be sent this.data = Unpooled.wrappedBuffer(recordArray); this.bufferValid = true; this.durable = true; // it's coming from the journal, so it's durable