ARTEMIS-2937 Implementing skip create and skip delete on Mirror Source

This commit is contained in:
Clebert Suconic 2020-10-30 08:38:20 -04:00
parent 7a5f325b72
commit dff2ed3638
3 changed files with 72 additions and 10 deletions

View File

@ -348,7 +348,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
throw new IllegalAccessException("Cannot start replica");
}
AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(snfQueue, server, replicaConfig.isMessageAcknowledgements());
AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(snfQueue, server, replicaConfig.isMessageAcknowledgements(), replicaConfig.isQueueCreation(), replicaConfig.isQueueRemoval());
server.scanAddresses(newPartition);

View File

@ -67,6 +67,8 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
final Queue snfQueue;
final ActiveMQServer server;
final boolean acks;
final boolean addQueues;
final boolean deleteQueues;
boolean started;
@ -83,10 +85,12 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
return started;
}
public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, boolean acks) {
public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, boolean acks, boolean addQueues, boolean deleteQueues) {
this.snfQueue = snfQueue;
this.server = server;
this.acks = acks;
this.addQueues = addQueues;
this.deleteQueues = deleteQueues;
}
@Override
@ -103,26 +107,34 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
@Override
public void addAddress(AddressInfo addressInfo) throws Exception {
Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, addressInfo.toJSON());
route(server, message);
if (addQueues) {
Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, addressInfo.toJSON());
route(server, message);
}
}
@Override
public void deleteAddress(AddressInfo addressInfo) throws Exception {
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, addressInfo.toJSON());
route(server, message);
if (deleteQueues) {
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, addressInfo.toJSON());
route(server, message);
}
}
@Override
public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, queueConfiguration.toJSON());
route(server, message);
if (addQueues) {
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, queueConfiguration.toJSON());
route(server, message);
}
}
@Override
public void deleteQueue(SimpleString address, SimpleString queue) throws Exception {
Message message = createMessage(address, queue, DELETE_QUEUE, queue.toString());
route(server, message);
if (deleteQueues) {
Message message = createMessage(address, queue, DELETE_QUEUE, queue.toString());
route(server, message);
}
}
@Override

View File

@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
@ -113,6 +114,55 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
server.stop();
}
@Test
public void testDoNotSendDelete() throws Exception {
testDoNotSendStuff(false);
}
@Test
public void testDoNotSendCreate() throws Exception {
testDoNotSendStuff(true);
}
private void testDoNotSendStuff(boolean sendCreate) throws Exception {
boolean ignoreCreate = false;
server.start();
final SimpleString ADDRESS_NAME = SimpleString.toSimpleString("address");
server_2 = createServer(AMQP_PORT_2, false);
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
AMQPMirrorBrokerConnectionElement mirror = new AMQPMirrorBrokerConnectionElement();
if (ignoreCreate) {
mirror.setQueueCreation(false);
} else {
mirror.setQueueCreation(true);
mirror.setQueueRemoval(false);
}
amqpConnection.addElement(mirror);
server_2.getConfiguration().addAMQPConnection(amqpConnection);
server_2.start();
Wait.assertTrue(server::isActive);
server_2.addAddressInfo(new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.ANYCAST));
server_2.createQueue(new QueueConfiguration(ADDRESS_NAME).setDurable(true).setAddress(ADDRESS_NAME));
if (!ignoreCreate) {
Wait.assertTrue(() -> server.locateQueue(ADDRESS_NAME) != null);
Wait.assertTrue(() -> server.getAddressInfo(ADDRESS_NAME) != null);
}
if (ignoreCreate) {
Thread.sleep(500); // things are asynchronous, I need to wait some time to make sure things are transferred over
Assert.assertTrue(server.locateQueue(ADDRESS_NAME) == null);
Assert.assertTrue(server.getAddressInfo(ADDRESS_NAME) == null);
}
server_2.stop();
server.stop();
}
@Test
public void testReplicaCatchupOnQueueCreatesAndDeletes() throws Exception {
server.start();