diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index e00dfb3815..ed9c53bcbf 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -803,7 +803,29 @@ ${basedir}/target/classes/servers/infinite-redelivery - + + test-compile + create-mmfactory + + create + + + + ${basedir}/target/classes/servers/mmfactory + true + admin + admin + ${basedir}/target/mmfactory + false + + --java-options + -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote=true + -Dcom.sun.management.jmxremote.port=11099 -Dcom.sun.management.jmxremote.rmi.port=11098 + -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false + + + + diff --git a/tests/smoke-tests/src/main/resources/servers/mmfactory/broker.xml b/tests/smoke-tests/src/main/resources/servers/mmfactory/broker.xml new file mode 100644 index 0000000000..b81420fba2 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/mmfactory/broker.xml @@ -0,0 +1,244 @@ + + + + + + + + 0.0.0.0 + + + true + + + NIO + + data/paging + + data/bindings + + data/journal + + data/large-messages + + true + + 2 + + 10 + + 4096 + + 10M + + + 10000 + + true + + + + 1 + + + + + + + + + + + + + + + + + + true + + + 5000 + + + 90 + + + true + + 120000 + + 60000 + + HALT + + + 70560 + + + + + + + + + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + + + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true + + + tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true + + + tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true + + + tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + DLQ + ExpiryQueue + 0 + + 2 + 524680 + 124680 + 10 + PAGE + true + true + true + true + + + + +
+ + + +
+
+ + + +
+ +
+ + +
+
+ + + + true + + +
+
+ +
+
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java index 964b749b7f..313b15c862 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java @@ -17,6 +17,9 @@ package org.apache.activemq.artemis.tests.smoke.common; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; import java.io.File; import java.io.IOException; import java.util.HashSet; @@ -86,4 +89,24 @@ public class SmokeTestBase extends ActiveMQTestBase { return process; } + protected static JMXConnector getJmxConnector(String hostname, int port) throws Exception { + // Without this, the RMI server would bind to the default interface IP (the user's local IP mostly) + System.setProperty("java.rmi.server.hostname", hostname); + + // I don't specify both ports here manually on purpose. See actual RMI registry connection port extraction below. + String urlString = "service:jmx:rmi:///jndi/rmi://" + hostname + ":" + port + "/jmxrmi"; + + JMXServiceURL url = new JMXServiceURL(urlString); + JMXConnector jmxConnector = null; + + try { + jmxConnector = JMXConnectorFactory.connect(url); + System.out.println("Successfully connected to: " + urlString); + } catch (Exception e) { + jmxConnector = null; + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + return jmxConnector; + } } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx2/JmxServerControlTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx2/JmxServerControlTest.java index 421e99c783..a7802e7ea5 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx2/JmxServerControlTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx2/JmxServerControlTest.java @@ -110,4 +110,5 @@ public class JmxServerControlTest extends SmokeTestBase { jmxConnector.close(); } } + } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mmfactory/MMSFactoryTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mmfactory/MMSFactoryTest.java new file mode 100644 index 0000000000..8756a867e7 --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mmfactory/MMSFactoryTest.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.mmfactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.SpawnedVMSupport; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.jboss.logging.Logger; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class MMSFactoryTest extends SmokeTestBase { + + private static final Logger logger = Logger.getLogger(MMSFactoryTest.class); + + public static final String SERVER_NAME_0 = "mmfactory"; + private static final String JMX_SERVER_HOSTNAME = "localhost"; + private static final int JMX_SERVER_PORT = 11099; + + final String theprotocol; + final int BATCH_SIZE; + // how many times the server will be restarted + final int restarts; + // how many times the clients will run per restart + final int clientRuns; + // As the produces sends messages, a client will be killed every X messages. This is it! + final int killClientEveryX; + + + @Parameterized.Parameters(name = "protocol={0}, batchSize={1}, restarts={2}, clientRuns={3}, killEveryX={4}") + public static Collection parameters() { + return Arrays.asList(new Object[][]{{"CORE", 2000, 2, 2, 500}, {"AMQP", 2000, 2, 2, 500}}); + } + + public MMSFactoryTest(String protocol, int batchSize, int restarts, int clientRuns, int killClientEveryX) { + this.theprotocol = protocol; + this.BATCH_SIZE = batchSize; + this.restarts = restarts; + this.clientRuns = clientRuns; + this.killClientEveryX = killClientEveryX; + } + + + public static void main(String[] arg) { + + try { + Consumer consumer = new Consumer(arg[0], Integer.parseInt(arg[1]), arg[2], Integer.parseInt(arg[3]), arg[4], Integer.parseInt(arg[5])); + //consumer.run(); + consumer.runListener(); + + while (true) { + Thread.sleep(10_000); + } + } catch (Throwable e) { + System.exit(1); + } + } + + public static String getConsumerLog(int id) { + return getServerLocation(SERVER_NAME_0) + "/data/" + "Consumer" + id + ".log"; + } + + public static ConnectionFactory createConnectionFactory(String protocol, String uri) { + if (protocol.toUpperCase().equals("OPENWIRE")) { + return new org.apache.activemq.ActiveMQConnectionFactory(uri); + } else if (protocol.toUpperCase().equals("AMQP")) { + + if (uri.startsWith("tcp://")) { + // replacing tcp:// by amqp:// + uri = "amqp" + uri.substring(3); + } + return new JmsConnectionFactory(uri); + } else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) { + return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri); + } else { + throw new IllegalStateException("Unkown:" + protocol); + } + } + + Process startConsumerProcess(String protocol, + int slowTime, + String queueName, + int credits, + int consumerID) throws Exception { + + return SpawnedVMSupport.spawnVM(MMSFactoryTest.class.getName(), protocol, "" + slowTime, queueName, "" + credits, getConsumerLog(consumerID), "" + consumerID); + } + + Process serverProcess; + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + disableCheckThread(); + serverProcess = startServer(SERVER_NAME_0, 0, 30000); + } + + @Test + public void testMMSorting() throws Exception { + for (int i = 0; i < restarts; i++) { + logger.debug("*******************************************************************************************************************************"); + logger.debug("Starting " + clientRuns); + logger.debug("*******************************************************************************************************************************"); + testMMSorting(clientRuns * i, clientRuns * (i + 1)); + + stopServerWithFile(getServerLocation(SERVER_NAME_0)); + Thread.sleep(1000); + + try { + serverProcess.destroyForcibly(); + } catch (Throwable ignored) { + } + serverProcess = startServer(SERVER_NAME_0, 0, 30000); + } + } + + public void testMMSorting(int countStart, int countEnd) throws Exception { + + JMXConnector jmxConnector = getJmxConnector(JMX_SERVER_HOSTNAME, JMX_SERVER_PORT); + + MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); + String brokerName = "0.0.0.0"; // configured e.g. in broker.xml element + ObjectNameBuilder objectNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), brokerName, true); + //ActiveMQServerControl activeMQServerControl = MBeanServerInvocationHandler.newProxyInstance(mBeanServerConnection, objectNameBuilder.getActiveMQServerObjectName(), ActiveMQServerControl.class, false); + + ObjectName queueObjectName = objectNameBuilder.getQueueObjectName(SimpleString.toSimpleString("MMFactory"), SimpleString.toSimpleString("MMConsumer"), RoutingType.MULTICAST); + QueueControl queueControl = MBeanServerInvocationHandler.newProxyInstance(mBeanServerConnection, queueObjectName, QueueControl.class, false); + + final int NUMBER_OF_CONSUMERS = 6; + final int NUMBER_OF_FASTCONSUMERS = 0; // not using at the moment + + Process[] consumers = new Process[NUMBER_OF_CONSUMERS + NUMBER_OF_FASTCONSUMERS]; + int[] timeForConsumers = new int[NUMBER_OF_CONSUMERS + NUMBER_OF_FASTCONSUMERS]; + + for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) { + timeForConsumers[i] = (i % 2 == 0 ? 200 : 300); + } + + timeForConsumers[1] = 100; + timeForConsumers[5] = 500; + + for (int i = NUMBER_OF_CONSUMERS; i < NUMBER_OF_CONSUMERS + NUMBER_OF_FASTCONSUMERS; i++) { + timeForConsumers[i] = 0; + } + + for (int i = 0; i < consumers.length; i++) { + consumers[i] = startConsumerProcess(theprotocol, timeForConsumers[i], "MMFactory::MMConsumer", 100, i); + } + + Process dlqProcess = startConsumerProcess(theprotocol, 0, "DLQ", 100, 1000); + + AtomicInteger retryNumber = new AtomicInteger(0); + int expectedTotalSize = 0; + + ConnectionFactory factory = createConnectionFactory(theprotocol, "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic queue = session.createTopic("MMFactory"); + MessageProducer mmsFactory = session.createProducer(queue); + + Topic controlTopic = session.createTopic("MMControl"); + + + String largeString; + { + StringBuffer largeStringBuffer = new StringBuffer(); + + while (largeStringBuffer.length() < 10) { + largeStringBuffer.append(RandomUtil.randomString()); + } + largeString = largeStringBuffer.toString(); + } + + try { + for (int run = countStart; run <= countEnd; run++) { + AtomicInteger lastTime = new AtomicInteger((int)queueControl.getMessagesAcknowledged()); + for (int i = 0; i < BATCH_SIZE; i++) { + if (i > 0 && i % killClientEveryX == 0) { + System.out.println("REconnecting..."); + logger.debug("Reconnecting..."); + consumers[0].destroyForcibly(); + consumers[0] = startConsumerProcess(theprotocol, timeForConsumers[0], "MMFactory::MMConsumer", 100, 0); + consumers[1].destroyForcibly(); + consumers[1] = startConsumerProcess(theprotocol, timeForConsumers[1], "MMFactory::MMConsumer", 100, 1); + logger.debug("...Reconnected"); + logger.debug("retry=" + retryNumber + ",sent=" + i + ", acked on this batch = " + (queueControl.getMessagesAcknowledged() - (retryNumber.get() * BATCH_SIZE * 2)) + ", total acked = " + queueControl.getMessagesAcknowledged()); + } + TextMessage message = session.createTextMessage("This is blue " + largeString); + message.setStringProperty("color", "blue"); + message.setIntProperty("i", i); + mmsFactory.send(message); + + message = session.createTextMessage("This is red " + largeString); + message.setStringProperty("color", "red"); + message.setIntProperty("i", i); + mmsFactory.send(message); + + if (i % 10 == 0) { + logger.debug("Sending " + i + " run = " + run); + } + + if (i == 0) { + waitForAckChange(queueControl, lastTime); + } + } + + Session sessionControl = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producerControl = sessionControl.createProducer(controlTopic); + Message controlmessage = sessionControl.createMessage(); + controlmessage.setStringProperty("control", "flush"); + producerControl.send(controlmessage); + sessionControl.commit(); + sessionControl.close(); + + Wait.assertTrue(() -> { + // We will wait a bit here until it's a least a bit closer to the whole Batch + if ((queueControl.getMessagesAcknowledged() + queueControl.getMessagesKilled() + queueControl.getMessagesExpired()) - (retryNumber.get() * BATCH_SIZE * 2) > (BATCH_SIZE * 2 - 500)) { + return true; + } else { + logger.debug("Received " + queueControl.getMessagesAcknowledged()); + return false; + } + }, 45_000, 1_000); + + expectedTotalSize += BATCH_SIZE * 2; + Wait.assertEquals(expectedTotalSize, queueControl::getMessagesAdded); + + Wait.assertEquals(expectedTotalSize, () -> queueControl.getMessagesAcknowledged() + queueControl.getMessagesKilled()); + retryNumber.incrementAndGet(); + + for (Process c : consumers) { + c.destroyForcibly(); + } + for (int i = 0; i < consumers.length; i++) { + File file = new File(getConsumerLog(i)); + if (!file.delete()) { + logger.debug("not possible to remove " + file); + } + } + for (int r = 0; r < consumers.length; r++) { + consumers[r] = startConsumerProcess(theprotocol, timeForConsumers[r], "MMFactory::MMConsumer", 100, r); + } + } + + Thread.sleep(1000); + } finally { + for (Process c : consumers) { + c.destroyForcibly(); + } + + dlqProcess.destroyForcibly(); + + for (int i = 0; i < consumers.length; i++) { + File file = new File(getConsumerLog(i)); + if (!file.delete()) { + logger.warn("not possible to remove " + file); + } + } + + File file = new File(getConsumerLog(1000)); //the DLQ processing ID used + if (!file.delete()) { + logger.warn("not possible to remove " + file); + } + } + } + + } + + private void waitForAckChange(QueueControl queueControl, AtomicInteger lastTime) throws Exception { + Wait.waitFor(() -> { + + if (lastTime.get() == queueControl.getMessagesAcknowledged()) { + logger.debug("Waiting some change on " + queueControl.getMessagesAcknowledged() + " with messages Added = " + queueControl.getMessagesAdded() + " and killed = " + queueControl.getMessagesKilled()); + return false; + } else { + logger.debug("Condition met! with " + queueControl.getMessagesAcknowledged() + " with messages Added = " + queueControl.getMessagesAdded() + " and killed = " + queueControl.getMessagesKilled()); + lastTime.set((int)queueControl.getMessagesAcknowledged()); + return true; + } + }, 5_000); + } + + static class Consumer implements MessageListener { + + boolean clientAck = false; + volatile int slowTime; + final String queuename; + final int credits; + final String protocol; + final int id; + ConnectionFactory factory; + Connection connection; + Session session; + MessageConsumer consumer; + MessageConsumer controlConsumer; + Queue queue; + + Session sessionControl; + + PrintStream fileStream; + + Consumer(String protocol, int slowTime, String queueName, int credits, String logOutput, int id) throws Exception { + this.slowTime = slowTime; + this.queuename = queueName; + this.credits = credits; + this.protocol = protocol; + fileStream = new PrintStream(new FileOutputStream(logOutput, true), true); + this.id = id; + } + + @Override + public void onMessage(Message message) { + try { + String color = message.getStringProperty("color"); + int messageSequence = message.getIntProperty("i"); + if (queuename.equals("DLQ")) { + logger.debug("Processing DLQ on color=" + color + ", sequence=" + messageSequence); + } else if (slowTime > 0) { + Thread.sleep(slowTime); + } + + if (clientAck) { + message.acknowledge(); + } + + fileStream.println(color + ";" + messageSequence); + } catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); + } + + } + + class ControlListener implements MessageListener { + + @Override + public void onMessage(Message message) { + try { + + // This is received at the client, on a remote machine, System.out is the best option to log here + System.out.println("Received control message"); + if (message.getStringProperty("control").equals("flush")) { + Consumer.this.slowTime = 0; + System.out.println("Setting slow time to 0"); + } + sessionControl.commit(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + + public void runListener() { + + //factory = createConnectionFactory(protocol, "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + credits); + factory = createConnectionFactory(protocol, "tcp://localhost:61616"); + + System.out.println("Starting"); + connect(); + try { + consumer.setMessageListener(Consumer.this); + } catch (Exception e) { + e.printStackTrace(); + System.exit(-1); + } + } + + private void connect() { + try { + if (connection != null) { + connection.close(); + } + + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, clientAck ? Session.CLIENT_ACKNOWLEDGE : Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue(queuename); + consumer = session.createConsumer(queue); + + sessionControl = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = sessionControl.createTopic("MMControl"); + controlConsumer = sessionControl.createSharedDurableConsumer(topic, "consumer" + id); + controlConsumer.setMessageListener(new ControlListener()); + + + } catch (Exception e) { + e.printStackTrace(); + System.exit(-1); + } + } + } + +}