ARTEMIS-4725 Mirroring tests using multiple versions

This commit is contained in:
Clebert Suconic 2024-04-16 14:43:02 -04:00 committed by clebertsuconic
parent 329d963717
commit eb7b0b0946
5 changed files with 194 additions and 22 deletions

View File

@ -443,6 +443,28 @@
</libList> </libList>
<file>${basedir}/target/ARTEMIS-2_18_0.cp</file> <file>${basedir}/target/ARTEMIS-2_18_0.cp</file>
</configuration> </configuration>
</execution>
<execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>2_33_0-check</id>
<configuration>
<optional>true</optional>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.33.0</arg>
<arg>org.apache.activemq:artemis-cli:2.33.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.33.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.33.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.33.0</arg>
<arg>org.apache.groovy:groovy-all:pom:${groovy.version}</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<file>${basedir}/target/ARTEMIS-2_33_0.cp</file>
</configuration>
</execution> </execution>
<execution> <execution>
<phase>compile</phase> <phase>compile</phase>

View File

@ -43,6 +43,7 @@ public class GroovyRun {
public static final String TWO_EIGHTEEN_ZERO = "ARTEMIS-2_18_0"; public static final String TWO_EIGHTEEN_ZERO = "ARTEMIS-2_18_0";
public static final String TWO_TWENTYTWO_ZERO = "ARTEMIS-2_22_0"; public static final String TWO_TWENTYTWO_ZERO = "ARTEMIS-2_22_0";
public static final String TWO_TWENTYEIGHT_ZERO = "ARTEMIS-2_28_0"; public static final String TWO_TWENTYEIGHT_ZERO = "ARTEMIS-2_28_0";
public static final String TWO_THIRTYTHREE_ZERO = "ARTEMIS-2_33_0";
public static final String HORNETQ_235 = "HORNETQ-235"; public static final String HORNETQ_235 = "HORNETQ-235";
public static final String HORNETQ_247 = "HORNETQ-247"; public static final String HORNETQ_247 = "HORNETQ-247";
public static final String AMQ_5_11 = "AMQ_5_11"; public static final String AMQ_5_11 = "AMQ_5_11";

View File

@ -29,6 +29,9 @@ import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
String folder = arg[0]; String folder = arg[0];
String id = arg[1]; String id = arg[1];
String queueName = arg[2]
String topicName = arg[3]
boolean useDual = Boolean.parseBoolean(arg[4])
configuration = new ConfigurationImpl(); configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO); configuration.setJournalType(JournalType.NIO);
@ -37,8 +40,20 @@ configuration.addAcceptorConfiguration("artemis", "tcp://localhost:61617");
configuration.setSecurityEnabled(false); configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(true); configuration.setPersistenceEnabled(true);
configuration.addAddressConfiguration(new CoreAddressConfiguration().setName("TestQueue").addRoutingType(RoutingType.ANYCAST)); configuration.addAddressConfiguration(new CoreAddressConfiguration().setName(queueName).addRoutingType(RoutingType.ANYCAST));
configuration.addQueueConfiguration(new QueueConfiguration("TestQueue").setAddress("TestQueue").setRoutingType(RoutingType.ANYCAST)); configuration.addQueueConfiguration(new QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST));
configuration.addAddressConfiguration(new CoreAddressConfiguration().setName(topicName).addRoutingType(RoutingType.MULTICAST));
if (useDual) {
try {
AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(false).setMessageAcknowledgements(true)
connection.addElement(replication);
configuration.addAMQPConnection(connection);
} catch (Throwable ignored) {
}
}
theBackupServer = new EmbeddedActiveMQ(); theBackupServer = new EmbeddedActiveMQ();
theBackupServer.setConfiguration(configuration); theBackupServer.setConfiguration(configuration);

View File

@ -29,6 +29,8 @@ import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
String folder = arg[0]; String folder = arg[0];
String id = arg[1]; String id = arg[1];
String queueName = arg[2]
String topicName = arg[3]
configuration = new ConfigurationImpl(); configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO); configuration.setJournalType(JournalType.NIO);
@ -37,12 +39,13 @@ configuration.addAcceptorConfiguration("artemis", "tcp://localhost:61616");
configuration.setSecurityEnabled(false); configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(true); configuration.setPersistenceEnabled(true);
configuration.addAddressConfiguration(new CoreAddressConfiguration().setName("TestQueue").addRoutingType(RoutingType.ANYCAST)); configuration.addAddressConfiguration(new CoreAddressConfiguration().setName(queueName).addRoutingType(RoutingType.ANYCAST));
configuration.addQueueConfiguration(new QueueConfiguration("TestQueue").setAddress("TestQueue").setRoutingType(RoutingType.ANYCAST)); configuration.addQueueConfiguration(new QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST));
configuration.addAddressConfiguration(new CoreAddressConfiguration().setName(topicName).addRoutingType(RoutingType.MULTICAST));
try { try {
AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(100); AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(true).setMessageAcknowledgements(true); AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(false).setMessageAcknowledgements(true)
connection.addElement(replication); connection.addElement(replication);
configuration.addAMQPConnection(connection); configuration.addAMQPConnection(connection);
} catch (Throwable ignored) { } catch (Throwable ignored) {

View File

@ -23,6 +23,8 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic;
import java.io.File;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
@ -30,7 +32,10 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.tests.compatibility.base.ClasspathBase; import org.apache.activemq.artemis.tests.compatibility.base.ClasspathBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -41,32 +46,41 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_THIRTYTHREE_ZERO;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_TWENTYEIGHT_ZERO; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_TWENTYEIGHT_ZERO;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class MirroredVersionTest extends ClasspathBase { public class MirroredVersionTest extends ClasspathBase {
private static final String QUEUE_NAME = "MirroredQueue";
private static final String TOPIC_NAME = "MirroredTopic";
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ClassLoader mainClassloader; private final ClassLoader mainClassloader;
private final ClassLoader backupClassLoader; private final ClassLoader backupClassLoader;
private final boolean useDual;
@Parameterized.Parameters(name = "BrokerA={0}, BrokerB={1}")
@Parameterized.Parameters(name = "BrokerA={0}, BrokerB={1}, dualMirror={2}")
public static Collection getParameters() { public static Collection getParameters() {
List<Object[]> combinations = new ArrayList<>(); List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{TWO_TWENTYEIGHT_ZERO, SNAPSHOT}); combinations.add(new Object[]{TWO_THIRTYTHREE_ZERO, SNAPSHOT, true});
combinations.add(new Object[]{SNAPSHOT, TWO_TWENTYEIGHT_ZERO}); combinations.add(new Object[]{SNAPSHOT, TWO_THIRTYTHREE_ZERO, true});
// The SNAPSHOT/SNAPSHOT is here as a test validation only, like in other cases where SNAPSHOT/SNAPSHOT is used. combinations.add(new Object[]{TWO_TWENTYEIGHT_ZERO, SNAPSHOT, false});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT}); combinations.add(new Object[]{SNAPSHOT, TWO_TWENTYEIGHT_ZERO, false});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, true});
return combinations; return combinations;
} }
public MirroredVersionTest(String main, String backup) throws Exception { public MirroredVersionTest(String main, String backup, boolean useDual) throws Exception {
this.mainClassloader = getClasspath(main); this.mainClassloader = getClasspath(main);
this.backupClassLoader = getClasspath(backup); this.backupClassLoader = getClasspath(backup);
this.useDual = useDual;
} }
@After @After
@ -79,6 +93,9 @@ public class MirroredVersionTest extends ClasspathBase {
evaluate(backupClassLoader, "multiVersionMirror/backupServerStop.groovy"); evaluate(backupClassLoader, "multiVersionMirror/backupServerStop.groovy");
} catch (Exception ignored) { } catch (Exception ignored) {
} }
FileUtil.deleteDirectory(new File(serverFolder.getRoot().getAbsolutePath(), "1"));
FileUtil.deleteDirectory(new File(serverFolder.getRoot().getAbsolutePath(), "2"));
} }
private String createBody(int size) { private String createBody(int size) {
@ -93,26 +110,24 @@ public class MirroredVersionTest extends ClasspathBase {
@Test @Test
public void testMirrorReplica() throws Throwable { public void testMirrorReplica() throws Throwable {
testMirrorReplicat(100); testMirrorReplica(100);
} }
@Test @Test
public void testMirrorReplicaLM() throws Throwable { public void testMirrorReplicaLM() throws Throwable {
testMirrorReplicat(300 * 1024); testMirrorReplica(300 * 1024);
} }
public void testMirrorReplicat(int stringSize) throws Throwable { public void testMirrorReplica(int stringSize) throws Throwable {
String body = createBody(stringSize); String body = createBody(stringSize);
logger.debug("Starting live"); logger.debug("Starting live");
evaluate(mainClassloader, "multiVersionMirror/mainServer.groovy", serverFolder.getRoot().getAbsolutePath(), "1"); startMainBroker();
logger.debug("Starting backup");
evaluate(backupClassLoader, "multiVersionMirror/backupServer.groovy", serverFolder.getRoot().getAbsolutePath(), "2");
ConnectionFactory factoryMain = new JmsConnectionFactory("amqp://localhost:61616"); ConnectionFactory factoryMain = new JmsConnectionFactory("amqp://localhost:61616");
try (Connection connection = factoryMain.createConnection()) { try (Connection connection = factoryMain.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue("TestQueue")); MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("hello " + i + body); TextMessage message = session.createTextMessage("hello " + i + body);
message.setIntProperty("count", i); message.setIntProperty("count", i);
@ -121,11 +136,18 @@ public class MirroredVersionTest extends ClasspathBase {
session.commit(); session.commit();
} }
logger.debug("restarting main server");
evaluate(mainClassloader, "multiVersionMirror/mainServerStop.groovy");
startMainBroker();
logger.debug("starting backup");
startBackupBroker();
ConnectionFactory factoryReplica = new JmsConnectionFactory("amqp://localhost:61617"); ConnectionFactory factoryReplica = new JmsConnectionFactory("amqp://localhost:61617");
try (Connection connection = factoryReplica.createConnection()) { try (Connection connection = factoryReplica.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session.createQueue("TestQueue")); MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
connection.start(); connection.start();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(5000); TextMessage message = (TextMessage) consumer.receive(5000);
@ -139,11 +161,11 @@ public class MirroredVersionTest extends ClasspathBase {
logger.debug("Restarting backup"); logger.debug("Restarting backup");
evaluate(backupClassLoader, "multiVersionMirror/backupServerStop.groovy"); evaluate(backupClassLoader, "multiVersionMirror/backupServerStop.groovy");
evaluate(backupClassLoader, "multiVersionMirror/backupServer.groovy", serverFolder.getRoot().getAbsolutePath(), "2"); startBackupBroker();
try (Connection connection = factoryReplica.createConnection()) { try (Connection connection = factoryReplica.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session.createQueue("TestQueue")); MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
connection.start(); connection.start();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(5000); TextMessage message = (TextMessage) consumer.receive(5000);
@ -154,4 +176,113 @@ public class MirroredVersionTest extends ClasspathBase {
session.commit(); session.commit();
} }
} }
private void startMainBroker() throws Exception {
evaluate(mainClassloader, "multiVersionMirror/mainServer.groovy", serverFolder.getRoot().getAbsolutePath(), "1", QUEUE_NAME, TOPIC_NAME);
}
private void startBackupBroker() throws Exception {
evaluate(backupClassLoader, "multiVersionMirror/backupServer.groovy", serverFolder.getRoot().getAbsolutePath(), "2", QUEUE_NAME, TOPIC_NAME, String.valueOf(useDual));
}
@Test
public void testTopic() throws Throwable {
int stringSize = 100;
String body = createBody(stringSize);
logger.debug("Starting live");
startMainBroker();
logger.debug("Starting backup");
startBackupBroker();
String clientID1 = "CONNECTION_1";
String clientID2 = "CONNECTION_2";
String sub1 = "SUB_1";
String sub2 = "SUB_2";
ConnectionFactory factoryMain = new JmsConnectionFactory("amqp://localhost:61616");
try (javax.jms.Connection connection = factoryMain.createConnection()) {
connection.setClientID(clientID1);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createDurableConsumer(topic, sub1);
}
try (javax.jms.Connection connection = factoryMain.createConnection()) {
connection.setClientID(clientID2);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createDurableConsumer(topic, sub2);
}
evaluate(backupClassLoader, "multiVersionMirror/backupServerStop.groovy");
try (Connection connection = factoryMain.createConnection()) {
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(null);
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("hello " + i + body);
message.setIntProperty("count", i);
producer.send(topic, message);
}
session.commit();
}
evaluate(mainClassloader, "multiVersionMirror/mainServerStop.groovy");
startBackupBroker();
startMainBroker();
ConnectionFactory factoryReplica = new JmsConnectionFactory("amqp://localhost:61617");
try (Connection connection = factoryReplica.createConnection()) {
connection.setClientID(clientID1);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
connection.start();
MessageConsumer consumer = session.createDurableConsumer(topic, sub1);
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(message);
}
session.rollback();
}
logger.debug("Restarting backup");
evaluate(backupClassLoader, "multiVersionMirror/backupServerStop.groovy");
startBackupBroker();
try (Connection connection = factoryReplica.createConnection()) {
connection.setClientID(clientID1);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
connection.start();
MessageConsumer consumer = session.createDurableConsumer(topic, sub1);
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(message);
}
session.commit();
}
try (Connection connection = factoryReplica.createConnection()) {
connection.setClientID(clientID2);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
connection.start();
MessageConsumer consumer = session.createDurableConsumer(topic, sub2);
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(message);
}
session.commit();
}
if (useDual) {
SimpleManagement simpleManagementMainServer = new SimpleManagement("tcp://localhost:61616", null, null);
Wait.assertEquals(0, () -> simpleManagementMainServer.getMessageCountOnQueue(clientID1 + "." + sub1), 5000);
Wait.assertEquals(0, () -> simpleManagementMainServer.getMessageCountOnQueue(clientID2 + "." + sub2), 5000);
}
}
} }