ARTEMIS-4733 Internal queues should not be mirrored

This commit is contained in:
Clebert Suconic 2024-04-22 00:06:07 -04:00
parent fdf2ea874b
commit 44e78d42a2
2 changed files with 65 additions and 0 deletions

View File

@ -209,6 +209,10 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return;
}
if (addressInfo.isInternal()) {
return;
}
if (ignoreAddress(addressInfo.getName())) {
return;
}

View File

@ -467,6 +467,67 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
}
@Test
public void testCreateInternalQueue() throws Exception {
server.setIdentity("Server1");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
server.getConfiguration().addAddressSetting("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("deadLetterQueue")).setMaxDeliveryAttempts(2));
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("deadLetterQueue"));
server.getConfiguration().addQueueConfiguration(new QueueConfiguration("deadLetterQueue").setRoutingType(RoutingType.ANYCAST));
server.getConfiguration().setMessageExpiryScanPeriod(-1);
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName("deadLetterQueue"));
server.getConfiguration().addQueueConfiguration(new QueueConfiguration("deadLetterQueue").setRoutingType(RoutingType.ANYCAST));
server_2.getConfiguration().setMessageExpiryScanPeriod(-1);
server_2.setIdentity("Server2");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to_1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
server_2.getConfiguration().addAddressSetting("#", new AddressSettings().setDeadLetterAddress(SimpleString.toSimpleString("deadLetterQueue")).setMaxDeliveryAttempts(2));
server_2.start();
org.apache.activemq.artemis.core.server.Queue to1 = locateQueueWithWait(server_2, "$ACTIVEMQ_ARTEMIS_MIRROR_to_1");
Assert.assertNotNull(to1);
long messagesAddedOnS2 = to1.getMessagesAdded();
String internalQueueName = getQueueName() + "Internal";
server.addAddressInfo(new AddressInfo(internalQueueName).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST).setInternal(true));
server.createQueue(new QueueConfiguration(internalQueueName).setDurable(true).setRoutingType(RoutingType.ANYCAST).setInternal(true));
server.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null, 5000);
Assert.assertTrue(server_2.getAddressInfo(SimpleString.toSimpleString(internalQueueName)) == null);
Assert.assertTrue(server_2.locateQueue(internalQueueName) == null);
Assert.assertEquals(messagesAddedOnS2, to1.getMessagesAdded());
server_2.stop();
server.stop();
}
@Test
public void testLVQ() throws Exception {
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();