ARTEMIS-4431 Re-encode the AMQP message annotations if hops are updates

When updating or adding the hops value the AMQP message needs a re-encode to
carry that value forward when the message is sent to the next broker.
This commit is contained in:
Timothy Bish 2023-09-15 11:08:13 -04:00 committed by Robbie Gemmell
parent 784aa9f884
commit e8d92b3bc5
2 changed files with 50 additions and 7 deletions

View File

@ -374,6 +374,9 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
message.setAnnotation(MESSAGE_HOPS_ANNOTATION, numHops.intValue() + 1);
}
// Annotations need to be rewritten to carry the change forward.
message.reencode();
return message;
}

View File

@ -87,6 +87,7 @@ import org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotation
import org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
import org.hamcrest.Matchers;
import org.jgroups.util.UUID;
import org.junit.Test;
import org.slf4j.Logger;
@ -388,16 +389,55 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
// Induce demand on the local broker which should then create a receiver to our remote peer.
try (ProtonTestClient receivingPeer = new ProtonTestClient()) {
receivingPeer.queueClientSaslAnonymousConnect();
receivingPeer.connect("localhost", AMQP_PORT);
receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createTopic("test"));
connection.start();
receivingPeer.expectOpen();
receivingPeer.expectBegin();
receivingPeer.expectAttach();
receivingPeer.remoteOpen().withContainerId("test-sender").now();
receivingPeer.remoteBegin().now();
receivingPeer.remoteAttach().ofReceiver()
.withInitialDeliveryCount(0)
.withName("sending-peer")
.withSource().withAddress("test")
.withCapabilities("topic").also()
.withTarget().also()
.now();
receivingPeer.remoteFlow().withLinkCredit(10).now();
receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
// Check that annotation for hops is present in the forwarded message.
final HeaderMatcher headerMatcher = new HeaderMatcher(true);
final MessageAnnotationsMatcher annotationsMatcher = new MessageAnnotationsMatcher(true);
annotationsMatcher.withEntry("x-opt-test", Matchers.equalTo("test"));
annotationsMatcher.withEntry(MESSAGE_HOPS_ANNOTATION.toString(), Matchers.equalTo(1));
final EncodedAmqpValueMatcher bodyMatcher = new EncodedAmqpValueMatcher("Hello World");
final TransferPayloadCompositeMatcher matcher = new TransferPayloadCompositeMatcher();
matcher.setHeadersMatcher(headerMatcher);
matcher.setMessageAnnotationsMatcher(annotationsMatcher);
matcher.addMessageContentMatcher(bodyMatcher);
// Broker should route the federated message to the client and it should
// carry the hops annotation indicating that one hop has occurred.
receivingPeer.expectTransfer().withPayload(matcher).accept();
peer.expectDisposition().withState().accepted();
peer.remoteTransfer().withHeader().withDurability(true)
.also()
.withMessageAnnotations().withAnnotation("x-opt-test", "test").also()
.withBody().withString("Hello World")
.also()
.withDeliveryId(1)
.now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
}