diff --git a/tests/compatibility-tests/pom.xml b/tests/compatibility-tests/pom.xml index f5b89dac9e..dda16306fe 100644 --- a/tests/compatibility-tests/pom.xml +++ b/tests/compatibility-tests/pom.xml @@ -445,6 +445,28 @@ + compile + + dependency-scan + + 2_33_0-check + + true + + org.apache.activemq:artemis-jms-server:2.33.0 + org.apache.activemq:artemis-cli:2.33.0 + org.apache.activemq:artemis-jms-client:2.33.0 + org.apache.activemq:artemis-hornetq-protocol:2.33.0 + org.apache.activemq:artemis-amqp-protocol:2.33.0 + org.apache.groovy:groovy-all:pom:${groovy.version} + + + org.apache.activemq.tests:compatibility-tests:${project.version} + + ${basedir}/target/ARTEMIS-2_33_0.cp + + + compile dependency-scan diff --git a/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java b/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java index c51e7dff72..3a73aa3b75 100644 --- a/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java +++ b/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java @@ -43,6 +43,7 @@ public class GroovyRun { public static final String TWO_EIGHTEEN_ZERO = "ARTEMIS-2_18_0"; public static final String TWO_TWENTYTWO_ZERO = "ARTEMIS-2_22_0"; public static final String TWO_TWENTYEIGHT_ZERO = "ARTEMIS-2_28_0"; + public static final String TWO_THIRTYTHREE_ZERO = "ARTEMIS-2_33_0"; public static final String HORNETQ_235 = "HORNETQ-235"; public static final String HORNETQ_247 = "HORNETQ-247"; public static final String AMQ_5_11 = "AMQ_5_11"; diff --git a/tests/compatibility-tests/src/main/resources/multiVersionMirror/backupServer.groovy b/tests/compatibility-tests/src/main/resources/multiVersionMirror/backupServer.groovy index da024c4c89..035d8beb30 100644 --- a/tests/compatibility-tests/src/main/resources/multiVersionMirror/backupServer.groovy +++ b/tests/compatibility-tests/src/main/resources/multiVersionMirror/backupServer.groovy @@ -29,6 +29,9 @@ import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ String folder = arg[0]; String id = arg[1]; +String queueName = arg[2] +String topicName = arg[3] +boolean useDual = Boolean.parseBoolean(arg[4]) configuration = new ConfigurationImpl(); configuration.setJournalType(JournalType.NIO); @@ -37,8 +40,20 @@ configuration.addAcceptorConfiguration("artemis", "tcp://localhost:61617"); configuration.setSecurityEnabled(false); configuration.setPersistenceEnabled(true); -configuration.addAddressConfiguration(new CoreAddressConfiguration().setName("TestQueue").addRoutingType(RoutingType.ANYCAST)); -configuration.addQueueConfiguration(new QueueConfiguration("TestQueue").setAddress("TestQueue").setRoutingType(RoutingType.ANYCAST)); +configuration.addAddressConfiguration(new CoreAddressConfiguration().setName(queueName).addRoutingType(RoutingType.ANYCAST)); +configuration.addQueueConfiguration(new QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST)); +configuration.addAddressConfiguration(new CoreAddressConfiguration().setName(topicName).addRoutingType(RoutingType.MULTICAST)); + +if (useDual) { + try { + AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(false).setMessageAcknowledgements(true) + connection.addElement(replication); + configuration.addAMQPConnection(connection); + } catch (Throwable ignored) { + } +} + theBackupServer = new EmbeddedActiveMQ(); theBackupServer.setConfiguration(configuration); diff --git a/tests/compatibility-tests/src/main/resources/multiVersionMirror/mainServer.groovy b/tests/compatibility-tests/src/main/resources/multiVersionMirror/mainServer.groovy index 921d88006b..70d6ab11cb 100644 --- a/tests/compatibility-tests/src/main/resources/multiVersionMirror/mainServer.groovy +++ b/tests/compatibility-tests/src/main/resources/multiVersionMirror/mainServer.groovy @@ -29,6 +29,8 @@ import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ String folder = arg[0]; String id = arg[1]; +String queueName = arg[2] +String topicName = arg[3] configuration = new ConfigurationImpl(); configuration.setJournalType(JournalType.NIO); @@ -37,12 +39,13 @@ configuration.addAcceptorConfiguration("artemis", "tcp://localhost:61616"); configuration.setSecurityEnabled(false); configuration.setPersistenceEnabled(true); -configuration.addAddressConfiguration(new CoreAddressConfiguration().setName("TestQueue").addRoutingType(RoutingType.ANYCAST)); -configuration.addQueueConfiguration(new QueueConfiguration("TestQueue").setAddress("TestQueue").setRoutingType(RoutingType.ANYCAST)); +configuration.addAddressConfiguration(new CoreAddressConfiguration().setName(queueName).addRoutingType(RoutingType.ANYCAST)); +configuration.addQueueConfiguration(new QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST)); +configuration.addAddressConfiguration(new CoreAddressConfiguration().setName(topicName).addRoutingType(RoutingType.MULTICAST)); try { AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(100); - AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(true).setMessageAcknowledgements(true); + AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(false).setMessageAcknowledgements(true) connection.addElement(replication); configuration.addAMQPConnection(connection); } catch (Throwable ignored) { @@ -50,4 +53,4 @@ try { theMainServer = new EmbeddedActiveMQ(); theMainServer.setConfiguration(configuration); -theMainServer.start(); +theMainServer.start(); \ No newline at end of file diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MirroredVersionTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MirroredVersionTest.java index b5f908c231..b6fb3860a9 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MirroredVersionTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MirroredVersionTest.java @@ -23,6 +23,8 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; +import java.io.File; import java.io.PrintWriter; import java.io.StringWriter; import java.lang.invoke.MethodHandles; @@ -30,7 +32,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.apache.activemq.artemis.api.core.management.SimpleManagement; import org.apache.activemq.artemis.tests.compatibility.base.ClasspathBase; +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.activemq.artemis.utils.Wait; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.After; import org.junit.Assert; @@ -41,32 +46,41 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_THIRTYTHREE_ZERO; import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_TWENTYEIGHT_ZERO; @RunWith(Parameterized.class) public class MirroredVersionTest extends ClasspathBase { + private static final String QUEUE_NAME = "MirroredQueue"; + private static final String TOPIC_NAME = "MirroredTopic"; + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final ClassLoader mainClassloader; private final ClassLoader backupClassLoader; + private final boolean useDual; - @Parameterized.Parameters(name = "BrokerA={0}, BrokerB={1}") + + @Parameterized.Parameters(name = "BrokerA={0}, BrokerB={1}, dualMirror={2}") public static Collection getParameters() { List combinations = new ArrayList<>(); - combinations.add(new Object[]{TWO_TWENTYEIGHT_ZERO, SNAPSHOT}); - combinations.add(new Object[]{SNAPSHOT, TWO_TWENTYEIGHT_ZERO}); - // The SNAPSHOT/SNAPSHOT is here as a test validation only, like in other cases where SNAPSHOT/SNAPSHOT is used. - combinations.add(new Object[]{SNAPSHOT, SNAPSHOT}); + combinations.add(new Object[]{TWO_THIRTYTHREE_ZERO, SNAPSHOT, true}); + combinations.add(new Object[]{SNAPSHOT, TWO_THIRTYTHREE_ZERO, true}); + combinations.add(new Object[]{TWO_TWENTYEIGHT_ZERO, SNAPSHOT, false}); + combinations.add(new Object[]{SNAPSHOT, TWO_TWENTYEIGHT_ZERO, false}); + combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, true}); return combinations; } - public MirroredVersionTest(String main, String backup) throws Exception { + public MirroredVersionTest(String main, String backup, boolean useDual) throws Exception { this.mainClassloader = getClasspath(main); this.backupClassLoader = getClasspath(backup); + + this.useDual = useDual; } @After @@ -79,6 +93,9 @@ public class MirroredVersionTest extends ClasspathBase { evaluate(backupClassLoader, "multiVersionMirror/backupServerStop.groovy"); } catch (Exception ignored) { } + + FileUtil.deleteDirectory(new File(serverFolder.getRoot().getAbsolutePath(), "1")); + FileUtil.deleteDirectory(new File(serverFolder.getRoot().getAbsolutePath(), "2")); } private String createBody(int size) { @@ -93,26 +110,24 @@ public class MirroredVersionTest extends ClasspathBase { @Test public void testMirrorReplica() throws Throwable { - testMirrorReplicat(100); + testMirrorReplica(100); } @Test public void testMirrorReplicaLM() throws Throwable { - testMirrorReplicat(300 * 1024); + testMirrorReplica(300 * 1024); } - public void testMirrorReplicat(int stringSize) throws Throwable { + public void testMirrorReplica(int stringSize) throws Throwable { String body = createBody(stringSize); logger.debug("Starting live"); - evaluate(mainClassloader, "multiVersionMirror/mainServer.groovy", serverFolder.getRoot().getAbsolutePath(), "1"); - logger.debug("Starting backup"); - evaluate(backupClassLoader, "multiVersionMirror/backupServer.groovy", serverFolder.getRoot().getAbsolutePath(), "2"); + startMainBroker(); ConnectionFactory factoryMain = new JmsConnectionFactory("amqp://localhost:61616"); try (Connection connection = factoryMain.createConnection()) { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = session.createProducer(session.createQueue("TestQueue")); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); for (int i = 0; i < 10; i++) { TextMessage message = session.createTextMessage("hello " + i + body); message.setIntProperty("count", i); @@ -121,11 +136,18 @@ public class MirroredVersionTest extends ClasspathBase { session.commit(); } + logger.debug("restarting main server"); + evaluate(mainClassloader, "multiVersionMirror/mainServerStop.groovy"); + startMainBroker(); + + logger.debug("starting backup"); + startBackupBroker(); + ConnectionFactory factoryReplica = new JmsConnectionFactory("amqp://localhost:61617"); try (Connection connection = factoryReplica.createConnection()) { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = session.createConsumer(session.createQueue("TestQueue")); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); connection.start(); for (int i = 0; i < 10; i++) { TextMessage message = (TextMessage) consumer.receive(5000); @@ -139,11 +161,11 @@ public class MirroredVersionTest extends ClasspathBase { logger.debug("Restarting backup"); evaluate(backupClassLoader, "multiVersionMirror/backupServerStop.groovy"); - evaluate(backupClassLoader, "multiVersionMirror/backupServer.groovy", serverFolder.getRoot().getAbsolutePath(), "2"); + startBackupBroker(); try (Connection connection = factoryReplica.createConnection()) { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = session.createConsumer(session.createQueue("TestQueue")); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); connection.start(); for (int i = 0; i < 10; i++) { TextMessage message = (TextMessage) consumer.receive(5000); @@ -154,4 +176,113 @@ public class MirroredVersionTest extends ClasspathBase { session.commit(); } } + + private void startMainBroker() throws Exception { + evaluate(mainClassloader, "multiVersionMirror/mainServer.groovy", serverFolder.getRoot().getAbsolutePath(), "1", QUEUE_NAME, TOPIC_NAME); + } + + private void startBackupBroker() throws Exception { + evaluate(backupClassLoader, "multiVersionMirror/backupServer.groovy", serverFolder.getRoot().getAbsolutePath(), "2", QUEUE_NAME, TOPIC_NAME, String.valueOf(useDual)); + } + + @Test + public void testTopic() throws Throwable { + int stringSize = 100; + String body = createBody(stringSize); + logger.debug("Starting live"); + startMainBroker(); + logger.debug("Starting backup"); + startBackupBroker(); + + String clientID1 = "CONNECTION_1"; + String clientID2 = "CONNECTION_2"; + + String sub1 = "SUB_1"; + String sub2 = "SUB_2"; + + ConnectionFactory factoryMain = new JmsConnectionFactory("amqp://localhost:61616"); + + try (javax.jms.Connection connection = factoryMain.createConnection()) { + connection.setClientID(clientID1); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + MessageConsumer consumer = session.createDurableConsumer(topic, sub1); + } + try (javax.jms.Connection connection = factoryMain.createConnection()) { + connection.setClientID(clientID2); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + MessageConsumer consumer = session.createDurableConsumer(topic, sub2); + } + + evaluate(backupClassLoader, "multiVersionMirror/backupServerStop.groovy"); + + try (Connection connection = factoryMain.createConnection()) { + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(TOPIC_NAME); + MessageProducer producer = session.createProducer(null); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage("hello " + i + body); + message.setIntProperty("count", i); + producer.send(topic, message); + } + session.commit(); + } + + evaluate(mainClassloader, "multiVersionMirror/mainServerStop.groovy"); + startBackupBroker(); + startMainBroker(); + + ConnectionFactory factoryReplica = new JmsConnectionFactory("amqp://localhost:61617"); + + try (Connection connection = factoryReplica.createConnection()) { + connection.setClientID(clientID1); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + connection.start(); + MessageConsumer consumer = session.createDurableConsumer(topic, sub1); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage)consumer.receive(5000); + Assert.assertNotNull(message); + } + session.rollback(); + } + + logger.debug("Restarting backup"); + evaluate(backupClassLoader, "multiVersionMirror/backupServerStop.groovy"); + startBackupBroker(); + + try (Connection connection = factoryReplica.createConnection()) { + connection.setClientID(clientID1); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + connection.start(); + MessageConsumer consumer = session.createDurableConsumer(topic, sub1); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage)consumer.receive(5000); + Assert.assertNotNull(message); + } + session.commit(); + } + + try (Connection connection = factoryReplica.createConnection()) { + connection.setClientID(clientID2); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + connection.start(); + MessageConsumer consumer = session.createDurableConsumer(topic, sub2); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage)consumer.receive(5000); + Assert.assertNotNull(message); + } + session.commit(); + } + + if (useDual) { + SimpleManagement simpleManagementMainServer = new SimpleManagement("tcp://localhost:61616", null, null); + Wait.assertEquals(0, () -> simpleManagementMainServer.getMessageCountOnQueue(clientID1 + "." + sub1), 5000); + Wait.assertEquals(0, () -> simpleManagementMainServer.getMessageCountOnQueue(clientID2 + "." + sub2), 5000); + } + } + } \ No newline at end of file