ARTEMIS-4725 Fix AMQP outgoing encoding if da encoded before header

Fix the AMQP message scanning to account for the header not being at the
front of the buffer which also accounts for odd case of broker storing
message with delivery annotations ahead of the header.
This commit is contained in:
Timothy Bish 2024-04-16 14:02:35 -04:00 committed by clebertsuconic
parent 7072eb187a
commit 329d963717
2 changed files with 106 additions and 1 deletions

View File

@ -700,7 +700,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
if (Header.class.equals(constructor.getTypeClass())) {
header = (Header) constructor.readValue();
headerPosition = constructorPos;
encodedHeaderSize = data.position();
encodedHeaderSize = data.position() - constructorPos;
if (header.getTtl() != null) {
if (!expirationReload) {
expiration = System.currentTimeMillis() + header.getTtl().intValue();
@ -778,6 +778,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
* @return a Netty ByteBuf containing the encoded bytes of this Message instance.
*/
public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference reference) {
ensureMessageDataScanned();
ensureDataIsValid();
DeliveryAnnotations daToWrite = reference != null ? reference.getProtocolData(DeliveryAnnotations.class) : null;
@ -825,6 +826,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
}
writeDeliveryAnnotationsForSendBuffer(result, deliveryAnnotations);
// skip existing delivery annotations of the original message
duplicate.position(encodedHeaderSize + encodedDeliveryAnnotationsSize);
result.writeBytes(duplicate.byteBuffer());

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
@ -73,14 +74,18 @@ import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.AMQPDefinedTypes;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@ -113,6 +118,12 @@ public class AMQPMessageTest {
private byte[] encodedProtonMessage;
private final DecoderImpl decoder = new DecoderImpl();
private final EncoderImpl encoder = new EncoderImpl(decoder);
{
AMQPDefinedTypes.registerAllTypes(decoder, encoder);
}
@Before
public void setUp() {
encodedProtonMessage = encodeMessage(createProtonMessage());
@ -2538,6 +2549,98 @@ public class AMQPMessageTest {
assertEquals(map.get("secondLong"), 1234567L);
}
@Test
public void testEncodedAMQPMessageHasReversedHeaderAndDA() throws Exception {
final Header header = new Header();
header.setDurable(true);
final DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
deliveryAnnotations.getValue().put(Symbol.valueOf("test-da"), "test-da");
final MessageAnnotations messageAnnotations = new MessageAnnotations(new HashMap<>());
messageAnnotations.getValue().put(Symbol.valueOf("test-ma"), "test-ma");
final ByteBuf nettyBuffer = Unpooled.buffer(1500);
WritableBuffer buffer = new NettyWritable(nettyBuffer);
final MessageReference reference = Mockito.mock(MessageReference.class);
try {
encoder.setByteBuffer(buffer);
encoder.writeObject(deliveryAnnotations);
encoder.writeObject(header);
encoder.writeObject(messageAnnotations);
} finally {
encoder.setByteBuffer((WritableBuffer) null);
}
final byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
final AMQPMessage message = new AMQPStandardMessage(0, bytes, null);
final ReadableBuffer encoded = message.getSendBuffer(0, reference);
final Message protonMessage = Proton.message();
protonMessage.decode(encoded);
final Header readHeader = protonMessage.getHeader();
final DeliveryAnnotations readDeliveryAnnotations = protonMessage.getDeliveryAnnotations();
final MessageAnnotations readMessageAnnotations = protonMessage.getMessageAnnotations();
assertTrue(readHeader.getDurable());
assertNull(readDeliveryAnnotations);
assertNotNull(readMessageAnnotations);
assertEquals("test-ma", readMessageAnnotations.getValue().get(Symbol.valueOf("test-ma")));
}
@Test
public void testEncodedAMQPMessageHasReversedHeaderAndDAWithOutgoingDeliveryAnnotations() throws Exception {
final Header header = new Header();
header.setDurable(true);
final DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
deliveryAnnotations.getValue().put(Symbol.valueOf("test-da"), "test-da");
final MessageAnnotations messageAnnotations = new MessageAnnotations(new HashMap<>());
messageAnnotations.getValue().put(Symbol.valueOf("test-ma"), "test-ma");
final ByteBuf nettyBuffer = Unpooled.buffer(1500);
WritableBuffer buffer = new NettyWritable(nettyBuffer);
final MessageReference reference = Mockito.mock(MessageReference.class);
final DeliveryAnnotations deliveryAnnotationsOut = new DeliveryAnnotations(new HashMap<>());
deliveryAnnotationsOut.getValue().put(Symbol.valueOf("new-da"), "new-da");
Mockito.when(reference.getProtocolData(DeliveryAnnotations.class)).thenReturn(deliveryAnnotationsOut);
try {
encoder.setByteBuffer(buffer);
encoder.writeObject(deliveryAnnotations);
encoder.writeObject(header);
encoder.writeObject(messageAnnotations);
} finally {
encoder.setByteBuffer((WritableBuffer) null);
}
final byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
final AMQPMessage message = new AMQPStandardMessage(0, bytes, null);
final ReadableBuffer encoded = message.getSendBuffer(0, reference);
final Message protonMessage = Proton.message();
protonMessage.decode(encoded);
final Header readHeader = protonMessage.getHeader();
final DeliveryAnnotations readDeliveryAnnotations = protonMessage.getDeliveryAnnotations();
final MessageAnnotations readMessageAnnotations = protonMessage.getMessageAnnotations();
assertTrue(readHeader.getDurable());
assertNotNull(readDeliveryAnnotations);
assertEquals("new-da", readDeliveryAnnotations.getValue().get(Symbol.valueOf("new-da")));
assertNotNull(readMessageAnnotations);
assertEquals("test-ma", readMessageAnnotations.getValue().get(Symbol.valueOf("test-ma")));
}
//----- Test Support ------------------------------------------------------//
private MessageImpl createProtonMessage() {