diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index fadf64fe21..5562ac708f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -348,7 +348,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, throw new IllegalAccessException("Cannot start replica"); } - AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(snfQueue, server, replicaConfig.isMessageAcknowledgements()); + AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(snfQueue, server, replicaConfig.isMessageAcknowledgements(), replicaConfig.isQueueCreation(), replicaConfig.isQueueRemoval()); server.scanAddresses(newPartition); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 645b386162..41f0c40876 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -67,6 +67,8 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom final Queue snfQueue; final ActiveMQServer server; final boolean acks; + final boolean addQueues; + final boolean deleteQueues; boolean started; @@ -83,10 +85,12 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom return started; } - public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, boolean acks) { + public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, boolean acks, boolean addQueues, boolean deleteQueues) { this.snfQueue = snfQueue; this.server = server; this.acks = acks; + this.addQueues = addQueues; + this.deleteQueues = deleteQueues; } @Override @@ -103,26 +107,34 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom @Override public void addAddress(AddressInfo addressInfo) throws Exception { - Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, addressInfo.toJSON()); - route(server, message); + if (addQueues) { + Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, addressInfo.toJSON()); + route(server, message); + } } @Override public void deleteAddress(AddressInfo addressInfo) throws Exception { - Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, addressInfo.toJSON()); - route(server, message); + if (deleteQueues) { + Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, addressInfo.toJSON()); + route(server, message); + } } @Override public void createQueue(QueueConfiguration queueConfiguration) throws Exception { - Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, queueConfiguration.toJSON()); - route(server, message); + if (addQueues) { + Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, queueConfiguration.toJSON()); + route(server, message); + } } @Override public void deleteQueue(SimpleString address, SimpleString queue) throws Exception { - Message message = createMessage(address, queue, DELETE_QUEUE, queue.toString()); - route(server, message); + if (deleteQueues) { + Message message = createMessage(address, queue, DELETE_QUEUE, queue.toString()); + route(server, message); + } } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java index eb5b202854..0a668aeadc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; @@ -113,6 +114,55 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { server.stop(); } + @Test + public void testDoNotSendDelete() throws Exception { + testDoNotSendStuff(false); + } + + @Test + public void testDoNotSendCreate() throws Exception { + testDoNotSendStuff(true); + } + + private void testDoNotSendStuff(boolean sendCreate) throws Exception { + boolean ignoreCreate = false; + server.start(); + + final SimpleString ADDRESS_NAME = SimpleString.toSimpleString("address"); + + server_2 = createServer(AMQP_PORT_2, false); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT); + AMQPMirrorBrokerConnectionElement mirror = new AMQPMirrorBrokerConnectionElement(); + if (ignoreCreate) { + mirror.setQueueCreation(false); + } else { + mirror.setQueueCreation(true); + mirror.setQueueRemoval(false); + } + amqpConnection.addElement(mirror); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + server_2.start(); + Wait.assertTrue(server::isActive); + + server_2.addAddressInfo(new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.ANYCAST)); + server_2.createQueue(new QueueConfiguration(ADDRESS_NAME).setDurable(true).setAddress(ADDRESS_NAME)); + + if (!ignoreCreate) { + Wait.assertTrue(() -> server.locateQueue(ADDRESS_NAME) != null); + Wait.assertTrue(() -> server.getAddressInfo(ADDRESS_NAME) != null); + } + + if (ignoreCreate) { + Thread.sleep(500); // things are asynchronous, I need to wait some time to make sure things are transferred over + Assert.assertTrue(server.locateQueue(ADDRESS_NAME) == null); + Assert.assertTrue(server.getAddressInfo(ADDRESS_NAME) == null); + } + server_2.stop(); + server.stop(); + } + @Test public void testReplicaCatchupOnQueueCreatesAndDeletes() throws Exception { server.start();