diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java index c0e05b8c60..322915ad15 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java @@ -374,6 +374,9 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal message.setAnnotation(MESSAGE_HOPS_ANNOTATION, numHops.intValue() + 1); } + // Annotations need to be rewritten to carry the change forward. + message.reencode(); + return message; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java index fc2d925069..178093b6a5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java @@ -87,6 +87,7 @@ import org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotation import org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher; import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher; import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher; +import org.hamcrest.Matchers; import org.jgroups.util.UUID; import org.junit.Test; import org.slf4j.Logger; @@ -388,16 +389,55 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport { .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()); peer.expectFlow().withLinkCredit(1000); - final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + // Induce demand on the local broker which should then create a receiver to our remote peer. + try (ProtonTestClient receivingPeer = new ProtonTestClient()) { + receivingPeer.queueClientSaslAnonymousConnect(); + receivingPeer.connect("localhost", AMQP_PORT); + receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS); - try (Connection connection = factory.createConnection()) { - final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); - session.createConsumer(session.createTopic("test")); - - connection.start(); + receivingPeer.expectOpen(); + receivingPeer.expectBegin(); + receivingPeer.expectAttach(); + receivingPeer.remoteOpen().withContainerId("test-sender").now(); + receivingPeer.remoteBegin().now(); + receivingPeer.remoteAttach().ofReceiver() + .withInitialDeliveryCount(0) + .withName("sending-peer") + .withSource().withAddress("test") + .withCapabilities("topic").also() + .withTarget().also() + .now(); + receivingPeer.remoteFlow().withLinkCredit(10).now(); + receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); - peer.close(); + + // Check that annotation for hops is present in the forwarded message. + final HeaderMatcher headerMatcher = new HeaderMatcher(true); + final MessageAnnotationsMatcher annotationsMatcher = new MessageAnnotationsMatcher(true); + annotationsMatcher.withEntry("x-opt-test", Matchers.equalTo("test")); + annotationsMatcher.withEntry(MESSAGE_HOPS_ANNOTATION.toString(), Matchers.equalTo(1)); + final EncodedAmqpValueMatcher bodyMatcher = new EncodedAmqpValueMatcher("Hello World"); + final TransferPayloadCompositeMatcher matcher = new TransferPayloadCompositeMatcher(); + matcher.setHeadersMatcher(headerMatcher); + matcher.setMessageAnnotationsMatcher(annotationsMatcher); + matcher.addMessageContentMatcher(bodyMatcher); + + // Broker should route the federated message to the client and it should + // carry the hops annotation indicating that one hop has occurred. + receivingPeer.expectTransfer().withPayload(matcher).accept(); + + peer.expectDisposition().withState().accepted(); + peer.remoteTransfer().withHeader().withDurability(true) + .also() + .withMessageAnnotations().withAnnotation("x-opt-test", "test").also() + .withBody().withString("Hello World") + .also() + .withDeliveryId(1) + .now(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } }