ARTEMIS-4733 Infinite mirror reflections after CreateAddress

This commit is contained in:
Clebert Suconic 2024-04-19 14:50:11 -04:00 committed by clebertsuconic
parent ed59b0ea91
commit fdf2ea874b
3 changed files with 168 additions and 30 deletions

View File

@ -118,6 +118,10 @@ public class SimpleManagement implements AutoCloseable {
return simpleManagementLong(ResourceNames.QUEUE + queueName, "getMessageCount");
}
public long getMessageAddedOnQueue(String queueName) throws Exception {
return simpleManagementLong(ResourceNames.QUEUE + queueName, "getMessagesAdded");
}
public int getDeliveringCountOnQueue(String queueName) throws Exception {
return simpleManagementInt(ResourceNames.QUEUE + queueName, "getDeliveringCount");
}

View File

@ -542,10 +542,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
server.callBrokerAddressPlugins(plugin -> plugin.beforeAddAddress(addressInfo, reload));
}
if (!reload && mirrorControllerSource != null) {
mirrorControllerSource.addAddress(addressInfo);
}
boolean result;
if (reload) {
result = addressManager.reloadAddressInfo(addressInfo);
@ -554,6 +550,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
// only register address if it is new
if (result) {
if (!reload && mirrorControllerSource != null) {
mirrorControllerSource.addAddress(addressInfo);
}
try {
managementService.registerAddress(addressInfo);

View File

@ -39,8 +39,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
@ -49,8 +53,6 @@ import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -80,18 +82,17 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
Process processDC2_node_A;
Process processDC2_node_B;
private static String DC1_NODEA_URI = "tcp://localhost:61616";
private static String DC1_NODEB_URI = "tcp://localhost:61617";
private static String DC2_NODEA_URI = "tcp://localhost:61618";
private static String DC2_NODEB_URI = "tcp://localhost:61619";
private static void createServer(String serverName, String connectionName, String clusterURI, String mirrorURI, int porOffset) throws Exception {
private static void createServer(String serverName, String connectionName, String clusterURI, String mirrorURI, int porOffset, boolean paging) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setClustered(true);
cliCreateServer.setNoWeb(true);
@ -115,24 +116,16 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
Assert.assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by ClusteredMirrorSoakTest.java --> \n"));
if (paging) {
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-size-messages>-1</max-size-messages>", "<max-size-messages>1</max-size-messages>"));
}
}
@Before
public void cleanupServers() {
cleanupData(DC1_NODE_A);
cleanupData(DC1_NODE_B);
cleanupData(DC2_NODE_A);
cleanupData(DC2_NODE_B);
}
@BeforeClass
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0);
createServer(DC1_NODE_B, "mirror", DC1_NODEA_URI, DC2_NODEB_URI, 1);
createServer(DC2_NODE_A, "mirror", DC2_NODEB_URI, DC1_NODEA_URI, 2);
createServer(DC2_NODE_B, "mirror", DC2_NODEA_URI, DC1_NODEB_URI, 3);
public static void createRealServers(boolean paging) throws Exception {
createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0, paging);
createServer(DC1_NODE_B, "mirror", DC1_NODEA_URI, DC2_NODEB_URI, 1, paging);
createServer(DC2_NODE_A, "mirror", DC2_NODEB_URI, DC1_NODEA_URI, 2, paging);
createServer(DC2_NODE_B, "mirror", DC2_NODEA_URI, DC1_NODEB_URI, 3, paging);
}
private void startServers() throws Exception {
@ -147,8 +140,147 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
ServerUtil.waitForServerToStart(3, 10_000);
}
@Test
public void testAvoidReflections() throws Exception {
createRealServers(true);
String internalQueue = "INTERNAL_QUEUE";
ActiveMQServer tempServer = createServer(true);
tempServer.getConfiguration().setBindingsDirectory(getServerLocation(DC1_NODE_A) + "/data/bindings");
tempServer.getConfiguration().setJournalDirectory(getServerLocation(DC1_NODE_A) + "/data/journal");
tempServer.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
tempServer.start();
tempServer.addAddressInfo(new AddressInfo(internalQueue).addRoutingType(RoutingType.ANYCAST).setInternal(true));
tempServer.createQueue(new QueueConfiguration(internalQueue).setDurable(true).setRoutingType(RoutingType.ANYCAST).setInternal(true).setAddress(internalQueue));
tempServer.stop();
startServers();
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null);
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
String queueName = "myQueue";
String topicName = "order";
for (int i = 0; i < 5; i++) {
logger.info("DC1A={}", simpleManagementDC1A.getMessageAddedOnQueue(snfQueue));
logger.info("DC1B={}", simpleManagementDC1B.getMessageAddedOnQueue(snfQueue));
logger.info("DC2A={}", simpleManagementDC2A.getMessageAddedOnQueue(snfQueue));
logger.info("DC2B={}", simpleManagementDC2B.getMessageAddedOnQueue(snfQueue));
// no load generated.. just initial queues should have been sent
Assert.assertTrue(simpleManagementDC1A.getMessageAddedOnQueue(snfQueue) < 20);
Assert.assertTrue(simpleManagementDC2A.getMessageAddedOnQueue(snfQueue) < 20);
Assert.assertTrue(simpleManagementDC1B.getMessageAddedOnQueue(snfQueue) < 20);
Assert.assertTrue(simpleManagementDC2B.getMessageAddedOnQueue(snfQueue) < 20);
Thread.sleep(100);
}
Assert.assertEquals(0, simpleManagementDC2A.getMessageCountOnQueue(queueName));
Assert.assertEquals(0, simpleManagementDC1A.getMessageCountOnQueue(internalQueue));
try {
simpleManagementDC2A.getMessageCountOnQueue(internalQueue);
Assert.fail("Exception expected");
} catch (Exception expected) {
}
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI);
int numberOfMessages = 1_000;
try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.setClientID("conn1");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(topicName);
MessageConsumer con = session.createDurableConsumer(topic, "hello1");
MessageConsumer con2 = session.createDurableConsumer(topic, "hello2");
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < numberOfMessages; i++) {
if (i % 100 == 0) {
logger.info("Sent topic {}", i);
}
producer.send(session.createTextMessage("hello " + i));
}
session.commit();
}
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < numberOfMessages; i++) {
if (i % 100 == 0) {
logger.info("Sent queue {}", i);
}
producer.send(session.createTextMessage("hello " + i));
}
session.commit();
}
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);
try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.setClientID("conn1");
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(topicName);
Queue queue = session.createQueue(queueName);
MessageConsumer[] consumers = new MessageConsumer[] {session.createDurableSubscriber(topic, "hello1"), session.createDurableSubscriber(topic, "hello2"), session.createConsumer(queue)};
for (MessageConsumer c : consumers) {
for (int i = 0; i < numberOfMessages; i++) {
Assert.assertNotNull(c.receive(5000));
if (i % 100 == 0) {
session.commit();
}
}
session.commit();
}
}
Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);
long countDC1A = simpleManagementDC1A.getMessageAddedOnQueue(snfQueue);
long countDC1B = simpleManagementDC1B.getMessageAddedOnQueue(snfQueue);
for (int i = 0; i < 10; i++) {
// DC1 should be quiet and nothing moving out of it
Assert.assertEquals(countDC1A, simpleManagementDC1A.getMessageAddedOnQueue(snfQueue));
Assert.assertEquals(countDC1B, simpleManagementDC1B.getMessageAddedOnQueue(snfQueue));
// DC2 is totally passive, nothing should have been generated
Assert.assertTrue(simpleManagementDC2A.getMessageAddedOnQueue(snfQueue) < 20);
Assert.assertTrue(simpleManagementDC2B.getMessageAddedOnQueue(snfQueue) < 20);
// we take intervals, allowing to make sure it doesn't grow
Thread.sleep(100);
logger.info("DC1A={}", simpleManagementDC1A.getMessageAddedOnQueue(snfQueue));
logger.info("DC1B={}", simpleManagementDC1B.getMessageAddedOnQueue(snfQueue));
logger.info("DC2A={}", simpleManagementDC2A.getMessageAddedOnQueue(snfQueue));
logger.info("DC2B={}", simpleManagementDC2B.getMessageAddedOnQueue(snfQueue));
}
}
@Test
public void testSimpleQueue() throws Exception {
createRealServers(false);
startServers();
final int numberOfMessages = 200;
@ -303,6 +435,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
ExecutorService executorService = Executors.newFixedThreadPool(2);
runAfter(executorService::shutdownNow);
createRealServers(false);
startServers();
String queueName = "testqueue" + RandomUtil.randomString();
@ -370,6 +503,7 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
@Test
public void testMirroredTopics() throws Exception {
createRealServers(false);
startServers();
final int numberOfMessages = 200;