Revert "NO-JIRA Test validating multiple mirrors connected to a node"
This reverts commit bf81e5fe31
.
This commit is contained in:
parent
1e29602158
commit
07ba37a74a
|
@ -1,237 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
|
|
||||||
|
|
||||||
import javax.jms.Connection;
|
|
||||||
import javax.jms.ConnectionFactory;
|
|
||||||
import javax.jms.Message;
|
|
||||||
import javax.jms.MessageConsumer;
|
|
||||||
import javax.jms.MessageProducer;
|
|
||||||
import javax.jms.Queue;
|
|
||||||
import javax.jms.Session;
|
|
||||||
import javax.jms.TextMessage;
|
|
||||||
import javax.jms.Topic;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.StringWriter;
|
|
||||||
import java.lang.invoke.MethodHandles;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
|
||||||
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
|
|
||||||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
|
||||||
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
|
|
||||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
|
||||||
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
|
||||||
import org.apache.activemq.artemis.util.ServerUtil;
|
|
||||||
import org.apache.activemq.artemis.utils.FileUtil;
|
|
||||||
import org.apache.activemq.artemis.utils.Wait;
|
|
||||||
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class MultiMirrorSoakTest extends SoakTestBase {
|
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
|
||||||
|
|
||||||
private static String largeBody;
|
|
||||||
private static String smallBody = "This is a small body";
|
|
||||||
|
|
||||||
static {
|
|
||||||
StringWriter writer = new StringWriter();
|
|
||||||
while (writer.getBuffer().length() < 1024 * 1024) {
|
|
||||||
writer.append("This is a large string ..... ");
|
|
||||||
}
|
|
||||||
largeBody = writer.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final String DC1_NODE_A = "MultiMirrorSoakTest/DC1";
|
|
||||||
public static final String DC2_NODE_A = "MultiMirrorSoakTest/DC2";
|
|
||||||
public static final String DC3_NODE_A = "MultiMirrorSoakTest/DC3";
|
|
||||||
|
|
||||||
Process processDC1_node_A;
|
|
||||||
Process processDC2_node_A;
|
|
||||||
Process processDC3_node_A;
|
|
||||||
|
|
||||||
private static String DC1_NODEA_URI = "tcp://localhost:61616";
|
|
||||||
private static String DC2_NODEA_URI = "tcp://localhost:61617";
|
|
||||||
private static String DC3_NODEA_URI = "tcp://localhost:61618";
|
|
||||||
|
|
||||||
private static void createServer(String serverName, int porOffset, boolean paging, String... mirrorTo) throws Exception {
|
|
||||||
File serverLocation = getFileServerLocation(serverName);
|
|
||||||
deleteDirectory(serverLocation);
|
|
||||||
|
|
||||||
HelperCreate cliCreateServer = new HelperCreate();
|
|
||||||
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
|
|
||||||
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
|
|
||||||
cliCreateServer.setClustered(false);
|
|
||||||
cliCreateServer.setNoWeb(true);
|
|
||||||
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A);
|
|
||||||
cliCreateServer.addArgs("--addresses", "order");
|
|
||||||
cliCreateServer.addArgs("--queues", "myQueue");
|
|
||||||
cliCreateServer.setPortOffset(porOffset);
|
|
||||||
cliCreateServer.createServer();
|
|
||||||
|
|
||||||
Properties brokerProperties = new Properties();
|
|
||||||
brokerProperties.put("largeMessageSync", "false");
|
|
||||||
if (mirrorTo != null && mirrorTo.length > 0) {
|
|
||||||
int mirrorID = 0;
|
|
||||||
for (String p : mirrorTo) {
|
|
||||||
brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".uri", p);
|
|
||||||
brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".retryInterval", "100");
|
|
||||||
brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
|
|
||||||
brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".connectionElements.mirror.sync", "false");
|
|
||||||
mirrorID++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
|
|
||||||
saveProperties(brokerProperties, brokerPropertiesFile);
|
|
||||||
|
|
||||||
File brokerXml = new File(serverLocation, "/etc/broker.xml");
|
|
||||||
Assert.assertTrue(brokerXml.exists());
|
|
||||||
// Adding redistribution delay to broker configuration
|
|
||||||
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by ClusteredMirrorSoakTest.java --> \n"));
|
|
||||||
if (paging) {
|
|
||||||
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-size-messages>-1</max-size-messages>", "<max-size-messages>1</max-size-messages>"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void createRealServers(boolean paging) throws Exception {
|
|
||||||
createServer(DC1_NODE_A, 0, paging, DC2_NODEA_URI, DC3_NODEA_URI);
|
|
||||||
createServer(DC2_NODE_A, 1, paging, DC1_NODEA_URI);
|
|
||||||
createServer(DC3_NODE_A, 2, paging, DC1_NODEA_URI);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startServers() throws Exception {
|
|
||||||
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
|
|
||||||
processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
|
|
||||||
processDC3_node_A = startServer(DC3_NODE_A, -1, -1, new File(getServerLocation(DC3_NODE_A), "broker.properties"));
|
|
||||||
|
|
||||||
ServerUtil.waitForServerToStart(0, 10_000);
|
|
||||||
ServerUtil.waitForServerToStart(1, 10_000);
|
|
||||||
ServerUtil.waitForServerToStart(2, 10_000);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMultiMirror() throws Exception {
|
|
||||||
createRealServers(false);
|
|
||||||
startServers();
|
|
||||||
internalMirror(DC1_NODEA_URI, DC3_NODEA_URI);
|
|
||||||
internalMirror(DC1_NODEA_URI, DC2_NODEA_URI);
|
|
||||||
internalMirror(DC3_NODEA_URI, DC1_NODEA_URI);
|
|
||||||
internalMirror(DC1_NODEA_URI, DC1_NODEA_URI);
|
|
||||||
internalMirror(DC2_NODEA_URI, DC3_NODEA_URI);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void internalMirror(String producerURI, String consumerURi) throws Exception {
|
|
||||||
final int numberOfMessages = 200;
|
|
||||||
|
|
||||||
Assert.assertTrue("numberOfMessages must be even", numberOfMessages % 2 == 0);
|
|
||||||
|
|
||||||
ConnectionFactory producerCF = CFUtil.createConnectionFactory("amqp", producerURI);
|
|
||||||
|
|
||||||
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
|
|
||||||
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
|
|
||||||
SimpleManagement simpleManagementDC3A = new SimpleManagement(DC3_NODEA_URI, null, null);
|
|
||||||
|
|
||||||
String queueName = "myQueue";
|
|
||||||
|
|
||||||
try (Connection connection = producerCF.createConnection()) {
|
|
||||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
|
||||||
Queue queue = session.createQueue(queueName);
|
|
||||||
MessageProducer producer = session.createProducer(queue);
|
|
||||||
|
|
||||||
for (int i = 0; i < numberOfMessages; i++) {
|
|
||||||
TextMessage message;
|
|
||||||
boolean large;
|
|
||||||
if (i % 1 == 2) {
|
|
||||||
message = session.createTextMessage(largeBody);
|
|
||||||
large = true;
|
|
||||||
} else {
|
|
||||||
message = session.createTextMessage(smallBody);
|
|
||||||
large = false;
|
|
||||||
}
|
|
||||||
message.setIntProperty("i", i);
|
|
||||||
message.setBooleanProperty("large", large);
|
|
||||||
producer.send(message);
|
|
||||||
if (i % 100 == 0) {
|
|
||||||
logger.debug("commit {}", i);
|
|
||||||
session.commit();
|
|
||||||
|
|
||||||
Wait.assertEquals(i + 1, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName));
|
|
||||||
Wait.assertEquals(i + 1, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName));
|
|
||||||
Wait.assertEquals(i + 1, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
session.commit();
|
|
||||||
}
|
|
||||||
|
|
||||||
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName));
|
|
||||||
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName));
|
|
||||||
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName));
|
|
||||||
|
|
||||||
ConnectionFactory consumerCF = CFUtil.createConnectionFactory("amqp", consumerURi);
|
|
||||||
|
|
||||||
try (Connection connection = consumerCF.createConnection()) {
|
|
||||||
connection.start();
|
|
||||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
|
||||||
Queue queue = session.createQueue(queueName);
|
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
|
||||||
|
|
||||||
for (int i = 0; i < numberOfMessages; i++) {
|
|
||||||
TextMessage message;
|
|
||||||
boolean large;
|
|
||||||
if (i % 1 == 2) {
|
|
||||||
large = true;
|
|
||||||
} else {
|
|
||||||
large = false;
|
|
||||||
}
|
|
||||||
message = (TextMessage) consumer.receive(5000);
|
|
||||||
Assert.assertNotNull(message);
|
|
||||||
Assert.assertEquals(i, message.getIntProperty("i"));
|
|
||||||
Assert.assertEquals(large, message.getBooleanProperty("large"));
|
|
||||||
if (i % 100 == 0) {
|
|
||||||
logger.debug("commit {}", i);
|
|
||||||
session.commit();
|
|
||||||
|
|
||||||
|
|
||||||
Wait.assertEquals(numberOfMessages - i - 1, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
|
|
||||||
Wait.assertEquals(numberOfMessages - i - 1, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
|
|
||||||
Wait.assertEquals(numberOfMessages - i - 1, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName), 5000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
session.commit();
|
|
||||||
}
|
|
||||||
Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName));
|
|
||||||
Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName));
|
|
||||||
Wait.assertEquals(0, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName));
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue