ARTEMIS-5153 Mark federation events and control queues as internal

In order to better indicate their nature as broker feature specific queues
we can mark the temporary queues created for AMQP federation events and
control link messages as internal.
This commit is contained in:
Timothy Bish 2024-11-13 12:28:05 -05:00 committed by Robbie Gemmell
parent 8d02f8dc77
commit 7555319dd0
4 changed files with 41 additions and 8 deletions

View File

@ -306,11 +306,15 @@ public class AMQPSessionCallback implements SessionCallback {
createTemporaryQueue(queueName, queueName, routingType, null, maxConsumers); createTemporaryQueue(queueName, queueName, routingType, null, maxConsumers);
} }
public void createTemporaryQueue(SimpleString queueName, RoutingType routingType, Integer maxConsumers, Boolean internal) throws Exception {
createTemporaryQueue(queueName, queueName, routingType, null, maxConsumers, internal);
}
public void createTemporaryQueue(SimpleString address, public void createTemporaryQueue(SimpleString address,
SimpleString queueName, SimpleString queueName,
RoutingType routingType, RoutingType routingType,
SimpleString filter) throws Exception { SimpleString filter) throws Exception {
createTemporaryQueue(address, queueName, routingType, filter, null); createTemporaryQueue(address, queueName, routingType, filter, null, null);
} }
public void createTemporaryQueue(SimpleString address, public void createTemporaryQueue(SimpleString address,
@ -318,13 +322,23 @@ public class AMQPSessionCallback implements SessionCallback {
RoutingType routingType, RoutingType routingType,
SimpleString filter, SimpleString filter,
Integer maxConsumers) throws Exception { Integer maxConsumers) throws Exception {
createTemporaryQueue(address, queueName, routingType, filter, null, null);
}
public void createTemporaryQueue(SimpleString address,
SimpleString queueName,
RoutingType routingType,
SimpleString filter,
Integer maxConsumers,
Boolean internal) throws Exception {
try { try {
serverSession.createQueue(QueueConfiguration.of(queueName).setAddress(address) serverSession.createQueue(QueueConfiguration.of(queueName).setAddress(address)
.setRoutingType(routingType) .setRoutingType(routingType)
.setFilterString(filter) .setFilterString(filter)
.setTemporary(true) .setTemporary(true)
.setDurable(false) .setDurable(false)
.setMaxConsumers(maxConsumers)); .setMaxConsumers(maxConsumers)
.setInternal(internal));
} catch (ActiveMQSecurityException se) { } catch (ActiveMQSecurityException se) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(se.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(se.getMessage());
} }

View File

@ -125,7 +125,7 @@ public class AMQPFederationCommandDispatcher implements SenderController {
controlAddress = federation.prefixControlLinkQueueName(sender.getRemoteTarget().getAddress()); controlAddress = federation.prefixControlLinkQueueName(sender.getRemoteTarget().getAddress());
try { try {
session.createTemporaryQueue(SimpleString.of(getControlLinkAddress()), RoutingType.ANYCAST, 1); session.createTemporaryQueue(SimpleString.of(getControlLinkAddress()), RoutingType.ANYCAST, 1, true);
} catch (Exception e) { } catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
} }

View File

@ -136,7 +136,7 @@ public class AMQPFederationEventDispatcher implements SenderController, ActiveMQ
} }
try { try {
session.createTemporaryQueue(SimpleString.of(getEventsLinkAddress()), RoutingType.ANYCAST, 1); session.createTemporaryQueue(SimpleString.of(getEventsLinkAddress()), RoutingType.ANYCAST, 1, true);
} catch (Exception e) { } catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
} }

View File

@ -47,6 +47,9 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES;
import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.allOf;
@ -66,11 +69,13 @@ import javax.jms.JMSException;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin; import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
@ -870,6 +875,13 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
Wait.assertTrue(() -> server.locateQueue(federationControlSenderAddress) != null); Wait.assertTrue(() -> server.locateQueue(federationControlSenderAddress) != null);
final Queue result = server.locateQueue(SimpleString.of(federationControlSenderAddress));
assertNotNull(result);
assertTrue(result.isTemporary());
assertTrue(result.isInternalQueue());
assertEquals(1, result.getMaxConsumers());
// Try and bind to the control address which should be rejected as the queue // Try and bind to the control address which should be rejected as the queue
// was created with max consumers of one. // was created with max consumers of one.
peer.expectAttach().ofSender() peer.expectAttach().ofSender()
@ -1027,6 +1039,13 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
// the server to allow sends of events beyond currently available credit. // the server to allow sends of events beyond currently available credit.
Wait.assertTrue(() -> server.locateQueue(federationEventsSenderAddress) != null); Wait.assertTrue(() -> server.locateQueue(federationEventsSenderAddress) != null);
final Queue result = server.locateQueue(SimpleString.of(federationEventsSenderAddress));
assertNotNull(result);
assertTrue(result.isTemporary());
assertTrue(result.isInternalQueue());
assertEquals(1, result.getMaxConsumers());
// Try and bind to the events address which should be rejected as the queue // Try and bind to the events address which should be rejected as the queue
// was created with max consumers of one. // was created with max consumers of one.
peer.expectAttach().ofSender() peer.expectAttach().ofSender()