ARTEMIS-4677 Validating AutoCreate with Mirror and Clustering

There is no semantic change on this commit.

I wrote a test to validate a scenario and this is to keep the test in the codebase.
This commit is contained in:
Clebert Suconic 2024-03-07 13:29:46 -05:00 committed by clebertsuconic
parent 7742936583
commit 063968bb4f
3 changed files with 146 additions and 13 deletions

View File

@ -31,6 +31,7 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.nio.file.Files;
import java.util.HashSet; import java.util.HashSet;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
@ -61,6 +62,22 @@ public class RealServerTestBase extends ActiveMQTestBase {
public static final String basedir = System.getProperty("basedir"); public static final String basedir = System.getProperty("basedir");
/**
* Search and replace strings on a file
*
* @param file file to be replaced
* @param find string expected to match
* @param replace string to be replaced
* @return true if the replacement was successful
* @throws Exception
*/
public static boolean findReplace(File file, String find, String replace) throws Exception {
String original = Files.readString(file.toPath());
String newContent = original.replace(find, replace);
Files.writeString(file.toPath(), newContent);
return !original.equals(newContent);
}
@After @After
public void after() throws Exception { public void after() throws Exception {
// close ServerLocators before killing the server otherwise they'll hang and delay test termination // close ServerLocators before killing the server otherwise they'll hang and delay test termination

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
@ -29,11 +30,19 @@ import java.io.File;
import java.io.StringWriter; import java.io.StringWriter;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.management.SimpleManagement; import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.tests.soak.SoakTestBase; import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.util.ServerUtil; import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
@ -43,7 +52,7 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class MirroredTopicSoakTest extends SoakTestBase { public class ClusteredMirrorSoakTest extends SoakTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -98,6 +107,11 @@ public class MirroredTopicSoakTest extends SoakTestBase {
brokerProperties.put("largeMessageSync", "false"); brokerProperties.put("largeMessageSync", "false");
File brokerPropertiesFile = new File(serverLocation, "broker.properties"); File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile); saveProperties(brokerProperties, brokerPropertiesFile);
File brokerXml = new File(serverLocation, "/etc/broker.xml");
Assert.assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
Assert.assertTrue(findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by ClusteredMirrorSoakTest.java --> \n"));
} }
@BeforeClass @BeforeClass
@ -121,7 +135,7 @@ public class MirroredTopicSoakTest extends SoakTestBase {
} }
@Test @Test
public void testQueue() throws Exception { public void testSimpleQueue() throws Exception {
startServers(); startServers();
final int numberOfMessages = 200; final int numberOfMessages = 200;
@ -197,6 +211,118 @@ public class MirroredTopicSoakTest extends SoakTestBase {
} }
} }
private CountDownLatch startConsumer(Executor executor, ConnectionFactory factory, String queue, AtomicBoolean running, AtomicInteger errorCount, AtomicInteger receivedCount) {
CountDownLatch done = new CountDownLatch(1);
executor.execute(() -> {
try {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(queue));
connection.start();
while (running.get()) {
Message message = consumer.receive(100);
if (message != null) {
receivedCount.incrementAndGet();
}
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
errorCount.incrementAndGet();
} finally {
done.countDown();
}
});
return done;
}
private boolean findQueue(SimpleManagement simpleManagement, String queue) {
try {
simpleManagement.getMessageCountOnQueue(queue);
return true;
} catch (Exception e) {
return false;
}
}
private void sendMessages(ConnectionFactory factory, String queueName, int messages, int commitInterval) throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < messages; i++) {
TextMessage message;
boolean large;
if (i % 1 == 2) {
message = session.createTextMessage(largeBody);
large = true;
} else {
message = session.createTextMessage(smallBody);
large = false;
}
message.setIntProperty("i", i);
message.setBooleanProperty("large", large);
producer.send(message);
if (i > 0 && i % commitInterval == 0) {
logger.debug("commit {}", i);
session.commit();
}
}
session.commit();
}
}
@Test
public void testAutoCreateQueue() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
runAfter(executorService::shutdownNow);
startServers();
String queueName = "queue" + RandomUtil.randomString();
final int numberOfMessages = 50;
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI);
ConnectionFactory connectionFactoryDC2B = CFUtil.createConnectionFactory("amqp", DC2_NODEB_URI);
AtomicBoolean runningConsumers = new AtomicBoolean(true);
runAfter(() -> runningConsumers.set(false));
AtomicInteger errors = new AtomicInteger(0);
AtomicInteger receiverCount = new AtomicInteger(0);
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEB_URI, null, null);
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null);
CountDownLatch doneDC2B = startConsumer(executorService, connectionFactoryDC2B, queueName, runningConsumers, errors, receiverCount);
sendMessages(connectionFactoryDC1A, queueName, numberOfMessages, 10);
Wait.assertEquals(numberOfMessages, receiverCount::get, 5000);
Wait.assertTrue(() -> findQueue(simpleManagementDC1A, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC1B, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC2A, queueName));
Wait.assertTrue(() -> findQueue(simpleManagementDC2B, queueName));
Wait.assertEquals(0, () -> simpleManagementDC1A.getDeliveringCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC1B.getDeliveringCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2A.getDeliveringCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2B.getDeliveringCountOnQueue(queueName), 5000);
runningConsumers.set(false);
Assert.assertTrue(doneDC2B.await(5, TimeUnit.SECONDS));
}
@Test @Test
public void testMirroredTopics() throws Exception { public void testMirroredTopics() throws Exception {
startServers(); startServers();

View File

@ -27,8 +27,6 @@ import javax.jms.TextMessage;
import java.io.File; import java.io.File;
import java.io.StringWriter; import java.io.StringWriter;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -105,11 +103,6 @@ public class InterruptedLargeMessageTest extends SoakTestBase {
File brokerPropertiesFile = new File(serverLocation, "broker.properties"); File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile); saveProperties(brokerProperties, brokerPropertiesFile);
Path configPath = new File(getServerLocation(serverName), "./etc/broker.xml").toPath();
String brokerXML = Files.readString(configPath);
// the SimpleMetricsPlugin needs to be added throught the XML
String insert; String insert;
{ {
StringWriter insertWriter = new StringWriter(); StringWriter insertWriter = new StringWriter();
@ -130,10 +123,7 @@ public class InterruptedLargeMessageTest extends SoakTestBase {
insert = insertWriter.toString(); insert = insertWriter.toString();
} }
brokerXML = brokerXML.replace("</core>", insert); Assert.assertTrue(findReplace(new File(getServerLocation(serverName), "./etc/broker.xml"), "</core>", insert));
Assert.assertTrue(brokerXML.contains("SimpleMetricsPlugin"));
Files.writeString(configPath, brokerXML);
} }
@BeforeClass @BeforeClass