From 7555319dd01b44536189f34f0c6fc7117cb467ff Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 13 Nov 2024 12:28:05 -0500 Subject: [PATCH] ARTEMIS-5153 Mark federation events and control queues as internal In order to better indicate their nature as broker feature specific queues we can mark the temporary queues created for AMQP federation events and control link messages as internal. --- .../amqp/broker/AMQPSessionCallback.java | 26 ++++++++++++++----- .../AMQPFederationCommandDispatcher.java | 2 +- .../AMQPFederationEventDispatcher.java | 2 +- .../connect/AMQPFederationConnectTest.java | 19 ++++++++++++++ 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 819b3311bb..b93a1280bd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -306,11 +306,15 @@ public class AMQPSessionCallback implements SessionCallback { createTemporaryQueue(queueName, queueName, routingType, null, maxConsumers); } + public void createTemporaryQueue(SimpleString queueName, RoutingType routingType, Integer maxConsumers, Boolean internal) throws Exception { + createTemporaryQueue(queueName, queueName, routingType, null, maxConsumers, internal); + } + public void createTemporaryQueue(SimpleString address, SimpleString queueName, RoutingType routingType, SimpleString filter) throws Exception { - createTemporaryQueue(address, queueName, routingType, filter, null); + createTemporaryQueue(address, queueName, routingType, filter, null, null); } public void createTemporaryQueue(SimpleString address, @@ -318,13 +322,23 @@ public class AMQPSessionCallback implements SessionCallback { RoutingType routingType, SimpleString filter, Integer maxConsumers) throws Exception { + createTemporaryQueue(address, queueName, routingType, filter, null, null); + } + + public void createTemporaryQueue(SimpleString address, + SimpleString queueName, + RoutingType routingType, + SimpleString filter, + Integer maxConsumers, + Boolean internal) throws Exception { try { serverSession.createQueue(QueueConfiguration.of(queueName).setAddress(address) - .setRoutingType(routingType) - .setFilterString(filter) - .setTemporary(true) - .setDurable(false) - .setMaxConsumers(maxConsumers)); + .setRoutingType(routingType) + .setFilterString(filter) + .setTemporary(true) + .setDurable(false) + .setMaxConsumers(maxConsumers) + .setInternal(internal)); } catch (ActiveMQSecurityException se) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(se.getMessage()); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java index c44c75d4c8..b4fbc9fdb5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java @@ -125,7 +125,7 @@ public class AMQPFederationCommandDispatcher implements SenderController { controlAddress = federation.prefixControlLinkQueueName(sender.getRemoteTarget().getAddress()); try { - session.createTemporaryQueue(SimpleString.of(getControlLinkAddress()), RoutingType.ANYCAST, 1); + session.createTemporaryQueue(SimpleString.of(getControlLinkAddress()), RoutingType.ANYCAST, 1, true); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java index d7e92a72c7..9b4f23832b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java @@ -136,7 +136,7 @@ public class AMQPFederationEventDispatcher implements SenderController, ActiveMQ } try { - session.createTemporaryQueue(SimpleString.of(getEventsLinkAddress()), RoutingType.ANYCAST, 1); + session.createTemporaryQueue(SimpleString.of(getEventsLinkAddress()), RoutingType.ANYCAST, 1, true); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java index f2b2c61ef4..43bb4cd5d6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java @@ -47,6 +47,9 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.allOf; @@ -66,11 +69,13 @@ import javax.jms.JMSException; import javax.jms.Session; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; @@ -870,6 +875,13 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport { Wait.assertTrue(() -> server.locateQueue(federationControlSenderAddress) != null); + final Queue result = server.locateQueue(SimpleString.of(federationControlSenderAddress)); + + assertNotNull(result); + assertTrue(result.isTemporary()); + assertTrue(result.isInternalQueue()); + assertEquals(1, result.getMaxConsumers()); + // Try and bind to the control address which should be rejected as the queue // was created with max consumers of one. peer.expectAttach().ofSender() @@ -1027,6 +1039,13 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport { // the server to allow sends of events beyond currently available credit. Wait.assertTrue(() -> server.locateQueue(federationEventsSenderAddress) != null); + final Queue result = server.locateQueue(SimpleString.of(federationEventsSenderAddress)); + + assertNotNull(result); + assertTrue(result.isTemporary()); + assertTrue(result.isInternalQueue()); + assertEquals(1, result.getMaxConsumers()); + // Try and bind to the events address which should be rejected as the queue // was created with max consumers of one. peer.expectAttach().ofSender()