diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/MultiMirrorSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/MultiMirrorSoakTest.java new file mode 100644 index 0000000000..697faaf91d --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/MultiMirrorSoakTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.io.File; +import java.io.StringWriter; +import java.lang.invoke.MethodHandles; +import java.util.HashSet; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.management.SimpleManagement; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.soak.SoakTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.apache.activemq.artemis.util.ServerUtil; +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MultiMirrorSoakTest extends SoakTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static String largeBody; + private static String smallBody = "This is a small body"; + + static { + StringWriter writer = new StringWriter(); + while (writer.getBuffer().length() < 1024 * 1024) { + writer.append("This is a large string ..... "); + } + largeBody = writer.toString(); + } + + public static final String DC1_NODE_A = "MultiMirrorSoakTest/DC1"; + public static final String DC2_NODE_A = "MultiMirrorSoakTest/DC2"; + public static final String DC3_NODE_A = "MultiMirrorSoakTest/DC3"; + + Process processDC1_node_A; + Process processDC2_node_A; + Process processDC3_node_A; + + private static String DC1_NODEA_URI = "tcp://localhost:61616"; + private static String DC2_NODEA_URI = "tcp://localhost:61617"; + private static String DC3_NODEA_URI = "tcp://localhost:61618"; + + private static void createServer(String serverName, int porOffset, boolean paging, String... mirrorTo) throws Exception { + File serverLocation = getFileServerLocation(serverName); + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = new HelperCreate(); + cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation); + cliCreateServer.setMessageLoadBalancing("ON_DEMAND"); + cliCreateServer.setClustered(false); + cliCreateServer.setNoWeb(true); + cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A); + cliCreateServer.addArgs("--addresses", "order"); + cliCreateServer.addArgs("--queues", "myQueue"); + cliCreateServer.setPortOffset(porOffset); + cliCreateServer.createServer(); + + Properties brokerProperties = new Properties(); + brokerProperties.put("largeMessageSync", "false"); + if (mirrorTo != null && mirrorTo.length > 0) { + int mirrorID = 0; + for (String p : mirrorTo) { + brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".uri", p); + brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".retryInterval", "100"); + brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString()); + brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".connectionElements.mirror.sync", "false"); + mirrorID++; + } + } + File brokerPropertiesFile = new File(serverLocation, "broker.properties"); + saveProperties(brokerProperties, brokerPropertiesFile); + + File brokerXml = new File(serverLocation, "/etc/broker.xml"); + Assert.assertTrue(brokerXml.exists()); + // Adding redistribution delay to broker configuration + Assert.assertTrue(FileUtil.findReplace(brokerXml, "", "\n\n" + " 0 \n")); + if (paging) { + Assert.assertTrue(FileUtil.findReplace(brokerXml, "-1", "1")); + } + } + + public static void createRealServers(boolean paging) throws Exception { + createServer(DC1_NODE_A, 0, paging, DC2_NODEA_URI, DC3_NODEA_URI); + createServer(DC2_NODE_A, 1, paging, DC1_NODEA_URI); + createServer(DC3_NODE_A, 2, paging, DC1_NODEA_URI); + } + + private void startServers() throws Exception { + processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties")); + processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties")); + processDC3_node_A = startServer(DC3_NODE_A, -1, -1, new File(getServerLocation(DC3_NODE_A), "broker.properties")); + + ServerUtil.waitForServerToStart(0, 10_000); + ServerUtil.waitForServerToStart(1, 10_000); + ServerUtil.waitForServerToStart(2, 10_000); + } + + + @Test + public void testMultiMirror() throws Exception { + createRealServers(false); + startServers(); + internalMirror(DC1_NODEA_URI, DC3_NODEA_URI); + internalMirror(DC1_NODEA_URI, DC2_NODEA_URI); + internalMirror(DC3_NODEA_URI, DC1_NODEA_URI); + internalMirror(DC1_NODEA_URI, DC1_NODEA_URI); + internalMirror(DC2_NODEA_URI, DC3_NODEA_URI); + } + + public void internalMirror(String producerURI, String consumerURi) throws Exception { + final int numberOfMessages = 200; + + Assert.assertTrue("numberOfMessages must be even", numberOfMessages % 2 == 0); + + ConnectionFactory producerCF = CFUtil.createConnectionFactory("amqp", producerURI); + + SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null); + SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null); + SimpleManagement simpleManagementDC3A = new SimpleManagement(DC3_NODEA_URI, null, null); + + String queueName = "myQueue"; + + try (Connection connection = producerCF.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + + for (int i = 0; i < numberOfMessages; i++) { + TextMessage message; + boolean large; + if (i % 1 == 2) { + message = session.createTextMessage(largeBody); + large = true; + } else { + message = session.createTextMessage(smallBody); + large = false; + } + message.setIntProperty("i", i); + message.setBooleanProperty("large", large); + producer.send(message); + if (i % 100 == 0) { + logger.debug("commit {}", i); + session.commit(); + + Wait.assertEquals(i + 1, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName)); + Wait.assertEquals(i + 1, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName)); + Wait.assertEquals(i + 1, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName)); + } + } + session.commit(); + } + + Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName)); + Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName)); + Wait.assertEquals(numberOfMessages, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName)); + + ConnectionFactory consumerCF = CFUtil.createConnectionFactory("amqp", consumerURi); + + try (Connection connection = consumerCF.createConnection()) { + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 0; i < numberOfMessages; i++) { + TextMessage message; + boolean large; + if (i % 1 == 2) { + large = true; + } else { + large = false; + } + message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(i, message.getIntProperty("i")); + Assert.assertEquals(large, message.getBooleanProperty("large")); + if (i % 100 == 0) { + logger.debug("commit {}", i); + session.commit(); + + + Wait.assertEquals(numberOfMessages - i - 1, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(numberOfMessages - i - 1, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000); + Wait.assertEquals(numberOfMessages - i - 1, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName), 5000); + } + } + session.commit(); + } + Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName)); + Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName)); + Wait.assertEquals(0, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName)); + } +} \ No newline at end of file