ARTEMIS-4585 Previously installed mirror queues with metrics plugin would make mirror non usable

This commit is contained in:
Clebert Suconic 2024-01-26 12:38:22 -05:00 committed by clebertsuconic
parent 83eb03c665
commit ddac006161
2 changed files with 118 additions and 16 deletions

View File

@ -507,7 +507,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
mirrorControlQueue = server.createQueue(new QueueConfiguration(getMirrorSNF(replicaConfig)).setAddress(getMirrorSNF(replicaConfig)).setRoutingType(RoutingType.ANYCAST).setDurable(replicaConfig.isDurable()).setInternal(true), true);
}
try {
server.registerQueueOnManagement(mirrorControlQueue, true);
} catch (Throwable ignored) {
logger.debug(ignored.getMessage(), ignored);
}
logger.debug("Mirror queue {}", mirrorControlQueue.getName());

View File

@ -27,11 +27,19 @@ import javax.jms.TextMessage;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
@ -63,6 +71,8 @@ public class InterruptedLargeMessageTest extends SoakTestBase {
public static final String DC1_NODE_A = "interruptLarge/DC1";
public static final String DC2_NODE_A = "interruptLarge/DC2";
private static final String SNF_QUEUE = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
Process processDC1_node_A;
Process processDC2_node_A;
@ -94,6 +104,36 @@ public class InterruptedLargeMessageTest extends SoakTestBase {
brokerProperties.put("largeMessageSync", "false");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
Path configPath = new File(getServerLocation(serverName), "./etc/broker.xml").toPath();
String brokerXML = Files.readString(configPath);
// the SimpleMetricsPlugin needs to be added throught the XML
String insert;
{
StringWriter insertWriter = new StringWriter();
insertWriter.write("\n");
insertWriter.write(" <metrics>\n");
insertWriter.write(" <jvm-memory>false</jvm-memory>\n");
insertWriter.write(" <jvm-gc>true</jvm-gc>\n");
insertWriter.write(" <jvm-threads>true</jvm-threads>\n");
insertWriter.write(" <netty-pool>true</netty-pool>\n");
insertWriter.write(" <plugin class-name=\"org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin\">\n");
insertWriter.write(" <property key=\"foo\" value=\"x\"/>\n");
insertWriter.write(" <property key=\"bar\" value=\"y\"/>\n");
insertWriter.write(" <property key=\"baz\" value=\"z\"/>\n");
insertWriter.write(" </plugin>\n");
insertWriter.write(" </metrics>\n");
insertWriter.write(" </core>\n");
insert = insertWriter.toString();
}
brokerXML = brokerXML.replace("</core>", insert);
Assert.assertTrue(brokerXML.contains("SimpleMetricsPlugin"));
Files.writeString(configPath, brokerXML);
}
@BeforeClass
@ -102,34 +142,50 @@ public class InterruptedLargeMessageTest extends SoakTestBase {
createServer(DC2_NODE_A, "mirror", DC1_NODEA_URI, 2);
}
private void startDC1() throws Exception {
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
}
@Before
public void cleanupServers() {
cleanupData(DC1_NODE_A);
cleanupData(DC2_NODE_A);
}
@Test
@Test(timeout = 240_000L)
public void testAMQP() throws Exception {
testInterrupt("AMQP");
}
@Test
@Test(timeout = 240_000L)
public void testCORE() throws Exception {
testInterrupt("CORE");
}
private void preCreateInternalQueues(String serverLocation) throws Exception {
Configuration configuration = createDefaultConfig(0, false);
configuration.setJournalDirectory(getServerLocation(serverLocation) + "/data/journal");
configuration.setJournalFileSize(ActiveMQDefaultConfiguration.getDefaultJournalFileSize());
configuration.setBindingsDirectory(getServerLocation(serverLocation) + "/data/bindings");
configuration.setLargeMessagesDirectory(getServerLocation(serverLocation) + "/data/large-messages");
ActiveMQServer server = createServer(true, configuration);
server.start();
try {
server.addAddressInfo(new AddressInfo(SNF_QUEUE).addRoutingType(RoutingType.ANYCAST).setInternal(false));
server.createQueue(new QueueConfiguration(SNF_QUEUE).setRoutingType(RoutingType.ANYCAST).setAddress(SNF_QUEUE).setDurable(true).setInternal(false));
} catch (Throwable error) {
logger.warn(error.getMessage(), error);
}
server.stop();
}
private void testInterrupt(final String protocol) throws Exception {
// This will force internal queues as "non internal"
// this is in an attempt to create issues between versions of the broker
preCreateInternalQueues(DC1_NODE_A);
preCreateInternalQueues(DC2_NODE_A);
startDC1();
final int numberOfMessages = 400;
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI);
ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI);
@ -154,13 +210,45 @@ public class InterruptedLargeMessageTest extends SoakTestBase {
startDC2();
// Waiting for at least one large message file in the target server
Wait.assertTrue(() -> getNumberOfLargeMessages(DC2_NODE_A) > 0, 5000, 100);
// We will keep interrupting the servers alternatively until all messages were transferred
boolean interruptSource = true;
while (getNumberOfLargeMessages(DC2_NODE_A) < numberOfMessages) {
if (interruptSource) {
stopDC1();
} else {
stopDC2();
}
long messagesBeforeStart = getNumberOfLargeMessages(DC2_NODE_A);
if (interruptSource) {
startDC1();
} else {
startDC2();
}
interruptSource = !interruptSource; // switch which side we are interrupting next time
long currentMessages = messagesBeforeStart;
// Waiting some progress
while (currentMessages == messagesBeforeStart && currentMessages < numberOfMessages) {
currentMessages = getNumberOfLargeMessages(DC2_NODE_A);
Thread.sleep(100);
}
Thread.sleep(2000);
currentMessages = getNumberOfLargeMessages(DC2_NODE_A);
if (logger.isDebugEnabled()) {
logger.debug("*******************************************************************************************************************************");
logger.debug("There are currently {} in the broker", currentMessages);
logger.debug("*******************************************************************************************************************************");
}
}
}
try (Connection connection = connectionFactoryDC2A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
@ -177,8 +265,8 @@ public class InterruptedLargeMessageTest extends SoakTestBase {
session.commit();
}
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, snfQueue));
Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, snfQueue));
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, SNF_QUEUE));
Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, SNF_QUEUE));
Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, QUEUE_NAME));
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, QUEUE_NAME));
@ -192,6 +280,16 @@ public class InterruptedLargeMessageTest extends SoakTestBase {
return lmFolder.list().length;
}
private void startDC1() throws Exception {
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
}
private void stopDC1() throws Exception {
processDC1_node_A.destroyForcibly();
Assert.assertTrue(processDC1_node_A.waitFor(10, TimeUnit.SECONDS));
}
private void stopDC2() throws Exception {
processDC2_node_A.destroyForcibly();
Assert.assertTrue(processDC2_node_A.waitFor(10, TimeUnit.SECONDS));