diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java index 3ee196cf8a..935a04e222 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Predicate; @@ -100,6 +101,11 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal private static final SimpleString MESSAGE_HOPS_ANNOTATION = SimpleString.of(AMQPFederationPolicySupport.MESSAGE_HOPS_ANNOTATION.toString()); + // Sequence ID value used to keep links that would otherwise have the same name from overlapping + // this generally occurs when a remote link detach is delayed and new demand is added before it + // arrives resulting in an unintended link stealing scenario in the proton engine. + private static final AtomicLong LINK_SEQUENCE_ID = new AtomicLong(); + private static final Symbol[] DEFAULT_OUTCOMES = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}; @@ -255,7 +261,8 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal private String generateLinkName() { return "federation-" + federation.getName() + "-address-receiver-" + consumerInfo.getAddress() + - "-" + federation.getServer().getNodeID(); + "-" + federation.getServer().getNodeID() + + "-" + LINK_SEQUENCE_ID.incrementAndGet(); } private void asyncCreateReceiver() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java index 17f54b5ac2..ac22936ce9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java @@ -4185,6 +4185,104 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport { } } + @Test + @Timeout(20) + public void testNewFederationConsumerCreatedWhenDemandRemovedAndAddedWithDelayedPreviousDetach() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withSenderSettleModeSettled() + .withSource().withDynamic(true) + .and() + .withDesiredCapability(FEDERATION_EVENT_LINK.toString()) + .respondInKind() + .withTarget().withAddress("test-dynamic-events"); + peer.expectFlow().withLinkCredit(10); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Test started, peer listening on: {}", remoteURI); + + final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement(); + receiveFromAddress.setName("address-policy"); + receiveFromAddress.addToIncludes(getTestName()); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName(getTestName()); + element.addLocalAddressPolicy(receiveFromAddress); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(allOf(containsString(getTestName()), + containsString("address-receiver"), + containsString(server.getNodeID().toString()))) + .respond() + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + peer.expectDetach().respond().afterDelay(40); // Defer the detach response for a bit + + server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()), RoutingType.MULTICAST)); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + // Create demand on the address which creates a federation consumer then let it close which + // should shut down that federation consumer. + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createTopic(getTestName())); + + connection.start(); + + consumer.receiveNoWait(); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(allOf(containsString(getTestName()), + containsString("address-receiver"), + containsString(server.getNodeID().toString()))) + .respond() + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + peer.expectDetach().respond(); + + // Create demand on the address which creates a federation consumer again quickly which + // can trigger a new consumer before the previous one was fully closed with a Detach + // response and get stuck because it will steal the link in proton and not be treated + // as a new attach for this consumer. + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createTopic(getTestName())); + + connection.start(); + + consumer.receiveNoWait(); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + private static void sendAddressAddedEvent(ProtonTestPeer peer, String address, int handle, int deliveryId) { final Map eventMap = new LinkedHashMap<>(); eventMap.put(REQUESTED_ADDRESS_NAME, address); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java index e597f5a548..f7462c7783 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java @@ -3995,6 +3995,102 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { } } + @Test + @Timeout(20) + public void testNewFederationConsumerCreatedWhenDemandRemovedAndAddedWithDelayedPreviousDetach() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withSenderSettleModeSettled() + .withSource().withDynamic(true) + .and() + .withDesiredCapability(FEDERATION_EVENT_LINK.toString()) + .respondInKind() + .withTarget().withAddress("test-dynamic-events"); + peer.expectFlow().withLinkCredit(10); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Test started, peer listening on: {}", remoteURI); + + final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement(); + receiveFromQueue.setName("queue-policy"); + receiveFromQueue.addToIncludes(getTestName(), getTestName()); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName(getTestName()); + element.addLocalQueuePolicy(receiveFromQueue); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(allOf(containsString(getTestName()), + containsString("queue-receiver"), + containsString(server.getNodeID().toString()))) + .respond() + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + peer.expectDetach().respond().afterDelay(40); // Defer the detach response for a bit + + server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST) + .setAddress(getTestName()) + .setAutoCreated(false)); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + // Create demand on the queue which creates a federation consumer then let it close which + // should shut down that federation consumer. + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createQueue(getTestName())); + + connection.start(); + + consumer.receiveNoWait(); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(allOf(containsString(getTestName()), + containsString("queue-receiver"), + containsString(server.getNodeID().toString()))) + .respond() + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + peer.expectDetach().respond(); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createQueue(getTestName())); + + connection.start(); + + consumer.receiveNoWait(); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + private static void sendQueueAddedEvent(ProtonTestPeer peer, String address, String queue, int handle, int deliveryId) { final Map eventMap = new LinkedHashMap<>(); eventMap.put(REQUESTED_ADDRESS_NAME, address);