From 329d963717d5e73e22d181e251fae20fc5c73809 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 16 Apr 2024 14:02:35 -0400 Subject: [PATCH] ARTEMIS-4725 Fix AMQP outgoing encoding if da encoded before header Fix the AMQP message scanning to account for the header not being at the front of the buffer which also accounts for odd case of broker storing message with delivery annotations ahead of the header. --- .../protocol/amqp/broker/AMQPMessage.java | 4 +- .../protocol/amqp/broker/AMQPMessageTest.java | 103 ++++++++++++++++++ 2 files changed, 106 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index e10ae2b9e2..a4922c2379 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -700,7 +700,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. if (Header.class.equals(constructor.getTypeClass())) { header = (Header) constructor.readValue(); headerPosition = constructorPos; - encodedHeaderSize = data.position(); + encodedHeaderSize = data.position() - constructorPos; if (header.getTtl() != null) { if (!expirationReload) { expiration = System.currentTimeMillis() + header.getTtl().intValue(); @@ -778,6 +778,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. * @return a Netty ByteBuf containing the encoded bytes of this Message instance. */ public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference reference) { + ensureMessageDataScanned(); ensureDataIsValid(); DeliveryAnnotations daToWrite = reference != null ? reference.getProtocolData(DeliveryAnnotations.class) : null; @@ -825,6 +826,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. } writeDeliveryAnnotationsForSendBuffer(result, deliveryAnnotations); + // skip existing delivery annotations of the original message duplicate.position(encodedHeaderSize + encodedDeliveryAnnotationsSize); result.writeBytes(duplicate.byteBuffer()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index 9114ef0bd1..a695141f32 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -48,6 +48,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable; @@ -73,14 +74,18 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.AMQPDefinedTypes; +import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.EncodingCodes; import org.apache.qpid.proton.codec.ReadableBuffer; +import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -113,6 +118,12 @@ public class AMQPMessageTest { private byte[] encodedProtonMessage; + private final DecoderImpl decoder = new DecoderImpl(); + private final EncoderImpl encoder = new EncoderImpl(decoder); + { + AMQPDefinedTypes.registerAllTypes(decoder, encoder); + } + @Before public void setUp() { encodedProtonMessage = encodeMessage(createProtonMessage()); @@ -2538,6 +2549,98 @@ public class AMQPMessageTest { assertEquals(map.get("secondLong"), 1234567L); } + @Test + public void testEncodedAMQPMessageHasReversedHeaderAndDA() throws Exception { + final Header header = new Header(); + header.setDurable(true); + + final DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); + deliveryAnnotations.getValue().put(Symbol.valueOf("test-da"), "test-da"); + + final MessageAnnotations messageAnnotations = new MessageAnnotations(new HashMap<>()); + messageAnnotations.getValue().put(Symbol.valueOf("test-ma"), "test-ma"); + + final ByteBuf nettyBuffer = Unpooled.buffer(1500); + WritableBuffer buffer = new NettyWritable(nettyBuffer); + + final MessageReference reference = Mockito.mock(MessageReference.class); + + try { + encoder.setByteBuffer(buffer); + encoder.writeObject(deliveryAnnotations); + encoder.writeObject(header); + encoder.writeObject(messageAnnotations); + } finally { + encoder.setByteBuffer((WritableBuffer) null); + } + + final byte[] bytes = new byte[nettyBuffer.writerIndex()]; + nettyBuffer.readBytes(bytes); + + final AMQPMessage message = new AMQPStandardMessage(0, bytes, null); + final ReadableBuffer encoded = message.getSendBuffer(0, reference); + + final Message protonMessage = Proton.message(); + protonMessage.decode(encoded); + + final Header readHeader = protonMessage.getHeader(); + final DeliveryAnnotations readDeliveryAnnotations = protonMessage.getDeliveryAnnotations(); + final MessageAnnotations readMessageAnnotations = protonMessage.getMessageAnnotations(); + + assertTrue(readHeader.getDurable()); + assertNull(readDeliveryAnnotations); + assertNotNull(readMessageAnnotations); + assertEquals("test-ma", readMessageAnnotations.getValue().get(Symbol.valueOf("test-ma"))); + } + + @Test + public void testEncodedAMQPMessageHasReversedHeaderAndDAWithOutgoingDeliveryAnnotations() throws Exception { + final Header header = new Header(); + header.setDurable(true); + + final DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); + deliveryAnnotations.getValue().put(Symbol.valueOf("test-da"), "test-da"); + + final MessageAnnotations messageAnnotations = new MessageAnnotations(new HashMap<>()); + messageAnnotations.getValue().put(Symbol.valueOf("test-ma"), "test-ma"); + + final ByteBuf nettyBuffer = Unpooled.buffer(1500); + WritableBuffer buffer = new NettyWritable(nettyBuffer); + + final MessageReference reference = Mockito.mock(MessageReference.class); + final DeliveryAnnotations deliveryAnnotationsOut = new DeliveryAnnotations(new HashMap<>()); + deliveryAnnotationsOut.getValue().put(Symbol.valueOf("new-da"), "new-da"); + Mockito.when(reference.getProtocolData(DeliveryAnnotations.class)).thenReturn(deliveryAnnotationsOut); + + try { + encoder.setByteBuffer(buffer); + encoder.writeObject(deliveryAnnotations); + encoder.writeObject(header); + encoder.writeObject(messageAnnotations); + } finally { + encoder.setByteBuffer((WritableBuffer) null); + } + + final byte[] bytes = new byte[nettyBuffer.writerIndex()]; + nettyBuffer.readBytes(bytes); + + final AMQPMessage message = new AMQPStandardMessage(0, bytes, null); + final ReadableBuffer encoded = message.getSendBuffer(0, reference); + + final Message protonMessage = Proton.message(); + protonMessage.decode(encoded); + + final Header readHeader = protonMessage.getHeader(); + final DeliveryAnnotations readDeliveryAnnotations = protonMessage.getDeliveryAnnotations(); + final MessageAnnotations readMessageAnnotations = protonMessage.getMessageAnnotations(); + + assertTrue(readHeader.getDurable()); + assertNotNull(readDeliveryAnnotations); + assertEquals("new-da", readDeliveryAnnotations.getValue().get(Symbol.valueOf("new-da"))); + assertNotNull(readMessageAnnotations); + assertEquals("test-ma", readMessageAnnotations.getValue().get(Symbol.valueOf("test-ma"))); + } + //----- Test Support ------------------------------------------------------// private MessageImpl createProtonMessage() {