From d3233e45f6adf258d2d02be70f3f5e297caa3e8f Mon Sep 17 00:00:00 2001 From: Carsten Lohmann Date: Tue, 2 Oct 2018 16:45:18 +0200 Subject: [PATCH] ARTEMIS-2045 Add support for setting delivery annotations on outgoing message --- .../protocol/amqp/broker/AMQPMessage.java | 63 +++++++++++---- .../protocol/amqp/broker/AMQPMessageTest.java | 76 +++++++++++++++++++ 2 files changed, 126 insertions(+), 13 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 990ec3a054..14606a4542 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -60,6 +60,7 @@ import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.DroppingWritableBuffer; import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.TypeConstructor; @@ -121,6 +122,7 @@ public class AMQPMessage extends RefCountMessage { private String connectionID; private final CoreMessageObjectPools coreMessageObjectPools; private Set rejectedConsumers; + private DeliveryAnnotations deliveryAnnotationsForSendBuffer; // These are properties set at the broker level and carried only internally by broker storage. private volatile TypedProperties extraProperties; @@ -239,6 +241,21 @@ public class AMQPMessage extends RefCountMessage { return scanForMessageSection(deliveryAnnotationsPosition, DeliveryAnnotations.class); } + /** + * Sets the delivery annotations to be included when encoding the message for sending it on the wire. + * + * The broker can add additional message annotations as long as the annotations being added follow the + * rules from the spec. If the user adds something that the remote doesn't understand and it is not + * prefixed with "x-opt" the remote can just kill the link. See: + * + * http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-annotations + * + * @param deliveryAnnotations delivery annotations used in the sendBuffer() method + */ + public void setDeliveryAnnotationsForSendBuffer(DeliveryAnnotations deliveryAnnotations) { + this.deliveryAnnotationsForSendBuffer = deliveryAnnotations; + } + /** * Returns a copy of the DeliveryAnnotations in the message if present or null. Changes to the * returned MessageAnnotations instance do not affect the original Message. @@ -545,26 +562,27 @@ public class AMQPMessage extends RefCountMessage { if (deliveryCount > 1) { return createCopyWithNewDeliveryCount(deliveryCount); - } else if (deliveryAnnotationsPosition != VALUE_NOT_PRESENT) { - return createCopyWithoutDeliveryAnnotations(); + } else if (deliveryAnnotationsPosition != VALUE_NOT_PRESENT + || (deliveryAnnotationsForSendBuffer != null && !deliveryAnnotationsForSendBuffer.getValue().isEmpty())) { + return createCopyWithSkippedOrExplicitlySetDeliveryAnnotations(); } else { - // Common case message has no delivery annotations and this is the first delivery - // so no re-encoding or section skipping needed. + // Common case message has no delivery annotations, no delivery annotations for the send buffer were set + // and this is the first delivery so no re-encoding or section skipping needed. return data.duplicate(); } } - private ReadableBuffer createCopyWithoutDeliveryAnnotations() { - assert deliveryAnnotationsPosition != VALUE_NOT_PRESENT; - - // The original message had delivery annotations and so we must copy into a new - // buffer skipping the delivery annotations section as that is not meant to survive - // beyond this hop. + private ReadableBuffer createCopyWithSkippedOrExplicitlySetDeliveryAnnotations() { + // The original message had delivery annotations, or delivery annotations for the send buffer are set. + // That means we must copy into a new buffer skipping the original delivery annotations section + // (not meant to survive beyond this hop) and including the delivery annotations for the send buffer if set. ReadableBuffer duplicate = data.duplicate(); final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize()); result.writeBytes(duplicate.limit(encodedHeaderSize).byteBuffer()); + writeDeliveryAnnotationsForSendBuffer(result); duplicate.clear(); + // skip existing delivery annotations of the original message duplicate.position(encodedHeaderSize + encodedDeliveryAnnotationsSize); result.writeBytes(duplicate.byteBuffer()); @@ -594,8 +612,8 @@ public class AMQPMessage extends RefCountMessage { TLSEncode.getEncoder().writeObject(header); TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); - // This will skip any existing delivery annotations that might have been present - // in the original message. + writeDeliveryAnnotationsForSendBuffer(result); + // skip existing delivery annotations of the original message data.position(encodedHeaderSize + encodedDeliveryAnnotationsSize); result.writeBytes(data.byteBuffer()); data.position(0); @@ -603,6 +621,25 @@ public class AMQPMessage extends RefCountMessage { return new NettyReadable(result); } + private void writeDeliveryAnnotationsForSendBuffer(ByteBuf result) { + if (deliveryAnnotationsForSendBuffer != null && !deliveryAnnotationsForSendBuffer.getValue().isEmpty()) { + TLSEncode.getEncoder().setByteBuffer(new NettyWritable(result)); + TLSEncode.getEncoder().writeObject(deliveryAnnotationsForSendBuffer); + TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); + } + } + + private int getDeliveryAnnotationsForSendBufferSize() { + if (deliveryAnnotationsForSendBuffer == null || deliveryAnnotationsForSendBuffer.getValue().isEmpty()) { + return 0; + } + DroppingWritableBuffer droppingWritableBuffer = new DroppingWritableBuffer(); + TLSEncode.getEncoder().setByteBuffer(droppingWritableBuffer); + TLSEncode.getEncoder().writeObject(deliveryAnnotationsForSendBuffer); + TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); + return droppingWritableBuffer.position() + 1; + } + @Override public void messageChanged() { modified = true; @@ -632,7 +669,7 @@ public class AMQPMessage extends RefCountMessage { public int getEncodeSize() { ensureDataIsValid(); // The encoded size will exclude any delivery annotations that are present as we will clip them. - return data.remaining() - encodedDeliveryAnnotationsSize; + return data.remaining() - encodedDeliveryAnnotationsSize + getDeliveryAnnotationsForSendBufferSize(); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index 12b91ea573..88414b4788 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -1920,6 +1920,82 @@ public class AMQPMessageTest { } } + @Test + public void testGetSendBufferWithoutDeliveryAnnotations() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + Header header = new Header(); + header.setDeliveryCount(new UnsignedInteger(1)); + protonMessage.setHeader(header); + Properties properties = new Properties(); + properties.setTo("someNiceLocal"); + protonMessage.setProperties(properties); + protonMessage.setBody(new AmqpValue("Sample payload")); + + DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); + final String annotationKey = "annotationKey"; + final String annotationValue = "annotationValue"; + deliveryAnnotations.getValue().put(Symbol.getSymbol(annotationKey), annotationValue); + protonMessage.setDeliveryAnnotations(deliveryAnnotations); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + ReadableBuffer sendBuffer = decoded.getSendBuffer(1); + assertEquals(decoded.getEncodeSize(), sendBuffer.capacity()); + AMQPMessage msgFromSendBuffer = new AMQPMessage(0, sendBuffer, null, null); + assertEquals("someNiceLocal", msgFromSendBuffer.getAddress()); + assertNull(msgFromSendBuffer.getDeliveryAnnotations()); + + // again with higher deliveryCount + ReadableBuffer sendBuffer2 = decoded.getSendBuffer(5); + assertEquals(decoded.getEncodeSize(), sendBuffer2.capacity()); + AMQPMessage msgFromSendBuffer2 = new AMQPMessage(0, sendBuffer2, null, null); + assertEquals("someNiceLocal", msgFromSendBuffer2.getAddress()); + assertNull(msgFromSendBuffer2.getDeliveryAnnotations()); + } + + @Test + public void testGetSendBufferWithDeliveryAnnotations() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + Header header = new Header(); + header.setDeliveryCount(new UnsignedInteger(1)); + protonMessage.setHeader(header); + Properties properties = new Properties(); + properties.setTo("someNiceLocal"); + protonMessage.setProperties(properties); + protonMessage.setBody(new AmqpValue("Sample payload")); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + DeliveryAnnotations newDeliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); + final String annotationKey = "annotationKey"; + final String annotationValue = "annotationValue"; + newDeliveryAnnotations.getValue().put(Symbol.getSymbol(annotationKey), annotationValue); + decoded.setDeliveryAnnotationsForSendBuffer(newDeliveryAnnotations); + + ReadableBuffer sendBuffer = decoded.getSendBuffer(1); + assertEquals(decoded.getEncodeSize(), sendBuffer.capacity()); + AMQPMessage msgFromSendBuffer = new AMQPMessage(0, sendBuffer, null, null); + assertEquals("someNiceLocal", msgFromSendBuffer.getAddress()); + assertNotNull(msgFromSendBuffer.getDeliveryAnnotations()); + assertEquals(1, msgFromSendBuffer.getDeliveryAnnotations().getValue().size()); + assertEquals(annotationValue, msgFromSendBuffer.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey))); + + // again with higher deliveryCount + DeliveryAnnotations newDeliveryAnnotations2 = new DeliveryAnnotations(new HashMap<>()); + final String annotationKey2 = "annotationKey2"; + final String annotationValue2 = "annotationValue2"; + newDeliveryAnnotations2.getValue().put(Symbol.getSymbol(annotationKey2), annotationValue2); + decoded.setDeliveryAnnotationsForSendBuffer(newDeliveryAnnotations2); + + ReadableBuffer sendBuffer2 = decoded.getSendBuffer(5); + assertEquals(decoded.getEncodeSize(), sendBuffer2.capacity()); + AMQPMessage msgFromSendBuffer2 = new AMQPMessage(0, sendBuffer2, null, null); + assertEquals("someNiceLocal", msgFromSendBuffer2.getAddress()); + assertNotNull(msgFromSendBuffer2.getDeliveryAnnotations()); + assertEquals(1, msgFromSendBuffer2.getDeliveryAnnotations().getValue().size()); + assertEquals(annotationValue2, msgFromSendBuffer2.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey2))); + } + //----- Test Support ------------------------------------------------------// private MessageImpl createProtonMessage() {