diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index ea9e741289..c703928c2d 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -203,6 +203,9 @@ public class Create extends InputAbstract { @Option(name = "--java-options", description = "Extra java options to be passed to the profile") private String javaOptions = ""; + @Option(name = "--java-memory", description = "Define the -Xmx memory parameter for the broker. Default = '2G'") + private String javaMemory = "2G"; + @Option(name = "--allow-anonymous", description = "Enables anonymous configuration on security, opposite of --require-login (Default: input)") private Boolean allowAnonymous = null; @@ -797,6 +800,7 @@ public class Create extends InputAbstract { filters.put("${java-opts}", javaOptions); + filters.put("${java-memory}", javaMemory); if (allowAnonymous) { write(ETC_LOGIN_CONFIG_WITH_GUEST, new File(etcFolder, ETC_LOGIN_CONFIG), filters, false); diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile index 4e31518c35..42358eb8ab 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile @@ -37,7 +37,7 @@ HAWTIO_ROLE='${role}' # Java Opts if [ -z "$JAVA_ARGS" ]; then - JAVA_ARGS="-XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx2G -Dhawtio.disableProxy=true -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml ${java-opts}" + JAVA_ARGS="-XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx${java-memory} -Dhawtio.disableProxy=true -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=${ARTEMIS_INSTANCE_ETC_URI}jolokia-access.xml ${java-opts}" fi # Uncomment to enable logging for Safepoint JVM pauses diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd index b38d970efa..729365716c 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile.cmd @@ -33,7 +33,7 @@ rem Cluster Properties: Used to pass arguments to ActiveMQ Artemis which can be rem set ARTEMIS_CLUSTER_PROPS=-Dactivemq.remoting.default.port=61617 -Dactivemq.remoting.amqp.port=5673 -Dactivemq.remoting.stomp.port=61614 -Dactivemq.remoting.hornetq.port=5446 rem Java Opts -IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx1024M -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.disableProxy=true -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANCE_ETC_URI%\jolokia-access.xml -Dartemis.instance=%ARTEMIS_INSTANCE%) +IF "%JAVA_ARGS%"=="" (set JAVA_ARGS=${java-opts} -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx${java-memory} -Xbootclasspath/a:%ARTEMIS_HOME%\lib\${logmanager};%ARTEMIS_HOME%\lib\${wildfly-common} -Djava.security.auth.login.config=%ARTEMIS_ETC_DIR%\login.config -Dhawtio.disableProxy=true -Dhawtio.offline=true -Dhawtio.realm=activemq -Dhawtio.role=${role} -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Djolokia.policyLocation=%ARTEMIS_INSTANCE_ETC_URI%\jolokia-access.xml -Dartemis.instance=%ARTEMIS_INSTANCE%) rem Logs Safepoints JVM pauses: Uncomment to enable them rem In addition to the traditional GC logs you could enable some JVM flags to know any meaningful and "hidden" pause that could diff --git a/scripts/one-test.sh b/scripts/one-test.sh index 583cd62c15..beb38c6c67 100755 --- a/scripts/one-test.sh +++ b/scripts/one-test.sh @@ -16,4 +16,4 @@ # specific language governing permissions and limitations # under the License. -mvn -Ptests -DfailIfNoTests=false -Ptests-retry -Ptests-CI -Pextra-tests -DskipStyleCheck=true -DskipPerformanceTests=false -Dtest=$1 test +mvn -Ptests -DfailIfNoTests=false -Ptests-retry -Pextra-tests -DskipStyleCheck=true -DskipPerformanceTests=false -DskipSoakTests=false -Dtest=$1 test diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml index 1e1d58334d..bc5cfe77fd 100644 --- a/tests/soak-tests/pom.xml +++ b/tests/soak-tests/pom.xml @@ -46,11 +46,9 @@ test - org.apache.activemq.tests - unit-tests + org.apache.activemq + artemis-unit-test-support ${project.version} - test - test-jar org.apache.activemq.tests @@ -84,6 +82,34 @@ + + org.apache.activemq + artemis-maven-plugin + ${project.version} + + + test-compile + create-horizontal-paging + + create + + + amq + admin + admin + true + false + ${basedir}/target/horizontalPaging + ${basedir}/target/classes/servers/horizontalPaging + + --java-memory + 10G + + + + + + org.apache.maven.plugins maven-surefire-plugin diff --git a/tests/soak-tests/src/main/resources/servers/horizontalPaging/broker.xml b/tests/soak-tests/src/main/resources/servers/horizontalPaging/broker.xml new file mode 100644 index 0000000000..92d3bfa96a --- /dev/null +++ b/tests/soak-tests/src/main/resources/servers/horizontalPaging/broker.xml @@ -0,0 +1,273 @@ + + + + + + + + 0.0.0.0 + + + true + + + ASYNCIO + + data/paging + + data/bindings + + data/journal + + data/large-messages + + + + + + + true + + 2 + + 10 + + 4096 + + 10M + + -1 + + 5 + + + 96000 + + + + 4096 + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + true + + 120000 + + 60000 + + HALT + + + 8332000 + + + + + + -1 + + + + + + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false + + + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true + + + tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true + + + tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true + + + tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + + + + -1 + + 0 + + 5M + + 50 + + 500K + 10 + PAGE + true + true + false + false + + + + +
+ + + +
+
+ + + +
+ +
+ + + + +
+
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java new file mode 100644 index 0000000000..5762670c01 --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak; + +import javax.management.MBeanServerInvocationHandler; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.cli.commands.Stop; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.util.ServerUtil; +import org.junit.After; +import org.junit.Assert; + +public class SoakTestBase extends ActiveMQTestBase { + Set processes = new HashSet<>(); + private static final String JMX_SERVER_HOSTNAME = "localhost"; + private static final int JMX_SERVER_PORT = 10099; + + public static final String basedir = System.getProperty("basedir"); + + @After + public void after() throws Exception { + for (Process process : processes) { + try { + ServerUtil.killServer(process, true); + } catch (Throwable e) { + e.printStackTrace(); + } + } + processes.clear(); + } + + public void killServer(Process process) { + processes.remove(process); + try { + ServerUtil.killServer(process); + } catch (Throwable e) { + e.printStackTrace(); + } + } + + protected static void stopServerWithFile(String serverLocation) throws IOException { + File serverPlace = new File(serverLocation); + File etcPlace = new File(serverPlace, "etc"); + File stopMe = new File(etcPlace, Stop.STOP_FILE_NAME); + Assert.assertTrue(stopMe.createNewFile()); + } + + public static String getServerLocation(String serverName) { + return basedir + "/target/" + serverName; + } + + public static void cleanupData(String serverName) { + String location = getServerLocation(serverName); + deleteDirectory(new File(location, "data")); + deleteDirectory(new File(location, "log")); + } + + public void addProcess(Process process) { + processes.add(process); + } + + public Process startServer(String serverName, int portID, int timeout) throws Exception { + Process process = ServerUtil.startServer(getServerLocation(serverName), serverName, portID, timeout); + addProcess(process); + return process; + } + + public Process startServer(String serverName, String uri, int timeout) throws Exception { + Process process = ServerUtil.startServer(getServerLocation(serverName), serverName, uri, timeout); + addProcess(process); + return process; + } + + protected JMXConnector getJmxConnector() throws MalformedURLException { + return getJmxConnector(JMX_SERVER_HOSTNAME, JMX_SERVER_PORT); + } + + protected static JMXConnector newJMXFactory(String uri) throws Throwable { + return JMXConnectorFactory.connect(new JMXServiceURL(uri)); + } + + protected static ActiveMQServerControl getServerControl(String uri, + ObjectNameBuilder builder, + long timeout) throws Throwable { + long expireLoop = System.currentTimeMillis() + timeout; + Throwable lastException = null; + do { + try { + JMXConnector connector = newJMXFactory(uri); + + ActiveMQServerControl serverControl = MBeanServerInvocationHandler.newProxyInstance(connector.getMBeanServerConnection(), builder.getActiveMQServerObjectName(), ActiveMQServerControl.class, false); + serverControl.isActive(); // making one call to make sure it's working + return serverControl; + } catch (Throwable e) { + lastException = e; + Thread.sleep(500); + } + } + while (expireLoop > System.currentTimeMillis()); + + throw lastException; + } + + protected static JMXConnector getJmxConnector(String hostname, int port) throws MalformedURLException { + // Without this, the RMI server would bind to the default interface IP (the user's local IP mostly) + System.setProperty("java.rmi.server.hostname", hostname); + + // I don't specify both ports here manually on purpose. See actual RMI registry connection port extraction below. + String urlString = "service:jmx:rmi:///jndi/rmi://" + hostname + ":" + port + "/jmxrmi"; + + JMXServiceURL url = new JMXServiceURL(urlString); + JMXConnector jmxConnector = null; + + try { + jmxConnector = JMXConnectorFactory.connect(url); + System.out.println("Successfully connected to: " + urlString); + } catch (Exception e) { + jmxConnector = null; + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + return jmxConnector; + } + + protected static final void recreateBrokerDirectory(final String homeInstance) { + recreateDirectory(homeInstance + "/data"); + recreateDirectory(homeInstance + "/log"); + } + + + public boolean waitForServerToStart(String uri, String username, String password, long timeout) throws InterruptedException { + long realTimeout = System.currentTimeMillis() + timeout; + while (System.currentTimeMillis() < realTimeout) { + try (ActiveMQConnectionFactory cf = ActiveMQJMSClient.createConnectionFactory(uri, null)) { + cf.createConnection(username, password).close(); + System.out.println("server " + uri + " started"); + } catch (Exception e) { + System.out.println("awaiting server " + uri + " start at "); + Thread.sleep(500); + continue; + } + return true; + } + + return false; + } + + protected void checkLogRecord(File logFile, boolean exist, String... values) throws Exception { + Assert.assertTrue(logFile.exists()); + boolean hasRecord = false; + try (BufferedReader reader = new BufferedReader(new FileReader(logFile))) { + String line = reader.readLine(); + while (line != null) { + if (line.contains(values[0])) { + boolean hasAll = true; + for (int i = 1; i < values.length; i++) { + if (!line.contains(values[i])) { + hasAll = false; + break; + } + } + if (hasAll) { + hasRecord = true; + System.out.println("audit has it: " + line); + break; + } + } + line = reader.readLine(); + } + if (exist) { + Assert.assertTrue(hasRecord); + } else { + Assert.assertFalse(hasRecord); + } + } + } + +} diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/TestParameters.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/TestParameters.java new file mode 100644 index 0000000000..c11421dc0b --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/TestParameters.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak; + +import org.jboss.logging.Logger; + +/** Encapsulates System properties that could be passed on to the test. */ +public class TestParameters { + + + private static final Logger logger = Logger.getLogger(TestParameters.class); + + private static String propertyName(String testName, String property) { + return "TEST_" + testName + "_" + property; + } + + public static int testProperty(String testName, String property, int defaultValue) { + try { + return Integer.parseInt(testProperty(testName, property, Integer.toString(defaultValue))); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + return defaultValue; + } + } + + public static String testProperty(String testName, String property, String defaultValue) { + + property = propertyName(testName, property); + + String value = System.getenv(property); + if (value == null) { + value = System.getProperty(property); + } + + if (value == null) { + logger.debug("System property '" + property + "' not defined, using default:" + defaultValue); + value = defaultValue; + } else { + logger.debug("Using " + property + "=" + value); + } + + logger.info(property + "=" + value); + + return value; + + } + + + +} diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientAbstract.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientAbstract.java index 97849f3feb..5572cf2fc9 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientAbstract.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientAbstract.java @@ -36,10 +36,8 @@ import org.jboss.logging.Logger; */ public abstract class ClientAbstract extends Thread { - private static final Logger log = Logger.getLogger(ClientAbstract.class); - protected ClientSession session; protected final ClientSessionFactory sf; @@ -56,12 +54,10 @@ public abstract class ClientAbstract extends Thread { */ protected volatile boolean pendingCommit = false; - public ClientAbstract(ClientSessionFactory sf) { this.sf = sf; } - public ClientSession getConnection() { return session; } diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientNonDivertedSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientNonDivertedSoakTest.java index 2e4f5881cc..135cd5a367 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientNonDivertedSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientNonDivertedSoakTest.java @@ -30,11 +30,13 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.jboss.logging.Logger; import org.junit.Before; import org.junit.Test; public class ClientNonDivertedSoakTest extends ActiveMQTestBase { + private static final Logger logger = Logger.getLogger(ClientNonDivertedSoakTest.class); private static final SimpleString ADDRESS = new SimpleString("ADD"); @@ -94,7 +96,7 @@ public class ClientNonDivertedSoakTest extends ActiveMQTestBase { producer.send(msg); if (i % 1000 == 0) { - System.out.println("Sent " + i + " messages"); + logger.info("Sent " + i + " messages"); session.commit(); } } @@ -111,14 +113,14 @@ public class ClientNonDivertedSoakTest extends ActiveMQTestBase { send.start(); rec1.start(); - long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1); + long timeEnd = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(ClientParameters.TIME_LIMIT_SECONDS); while (timeEnd > System.currentTimeMillis()) { if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0) { - System.out.println("There are sequence errors in some of the clients, please look at the logs"); + logger.info("There are sequence errors in some of the clients, please look at the logs"); break; } - System.out.println("count = " + send.msgs); + logger.info("count = " + send.msgs); Thread.sleep(10000); } diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientParameters.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientParameters.java new file mode 100644 index 0000000000..5f2ac8e4fd --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientParameters.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.client; + +import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty; + +public class ClientParameters { + private static final String TEST_NAME = "CLIENT"; + public static final int TIME_LIMIT_SECONDS = testProperty(TEST_NAME, "TIME_LIMIT_SECONDS", 60); + public static final int TEST_REPETITION = testProperty(TEST_NAME, "REPETITIONS", 10); + +} diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientSoakTest.java index 29164c0482..56fd11212e 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientSoakTest.java @@ -32,12 +32,13 @@ import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.jboss.logging.Logger; import org.junit.Before; import org.junit.Test; public class ClientSoakTest extends ActiveMQTestBase { - + private static final Logger logger = Logger.getLogger(ClientSoakTest.class); private static final SimpleString ADDRESS = new SimpleString("ADD"); @@ -115,7 +116,7 @@ public class ClientSoakTest extends ActiveMQTestBase { producer.send(msg); if (i % 1000 == 0) { - System.out.println("Sent " + i + " messages"); + logger.info("Sent " + i + " messages"); session.commit(); } } @@ -134,10 +135,10 @@ public class ClientSoakTest extends ActiveMQTestBase { rec1.start(); rec2.start(); - long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1); + long timeEnd = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(ClientParameters.TIME_LIMIT_SECONDS); while (timeEnd > System.currentTimeMillis()) { if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 || rec2.getErrorsCount() != 0) { - System.out.println("There are sequence errors in some of the clients, please look at the logs"); + logger.info("There are sequence errors in some of the clients, please look at the logs"); break; } Thread.sleep(10000); diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/Receiver.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/Receiver.java index d4dbcae977..9d99a04675 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/Receiver.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/Receiver.java @@ -24,9 +24,11 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.jboss.logging.Logger; public class Receiver extends ClientAbstract { + private static final Logger logger = Logger.getLogger(Receiver.class); // We should leave some messages on paging. We don't want to consume all for this test private final Semaphore minConsume = new Semaphore(0); @@ -72,7 +74,7 @@ public class Receiver extends ClientAbstract { if (msg.getLongProperty("count") != msgs + pendingMsgs) { errors++; - System.out.println("count should be " + (msgs + pendingMsgs) + " when it was " + msg.getLongProperty("count") + " on " + queue); + logger.info("count should be " + (msgs + pendingMsgs) + " when it was " + msg.getLongProperty("count") + " on " + queue); } pendingMsgs++; diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/failover/RandomFailoverSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/failover/RandomFailoverSoakTest.java index 0daa8cd730..110e995afd 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/failover/RandomFailoverSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/failover/RandomFailoverSoakTest.java @@ -18,11 +18,16 @@ package org.apache.activemq.artemis.tests.soak.failover; import org.apache.activemq.artemis.tests.integration.cluster.reattach.RandomReattachTest; +import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty; + public class RandomFailoverSoakTest extends RandomReattachTest { + private static final String TEST_NAME = "RANDOM"; + public static final int TEST_REPETITION = testProperty(TEST_NAME, "TEST_REPETITION", 100); + @Override protected int getNumIterations() { - return 500; + return TEST_REPETITION; } } diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java new file mode 100644 index 0000000000..d7485e7d0b --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.soak.SoakTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.activemq.artemis.utils.SpawnedVMSupport; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.soak.TestParameters.testProperty; + +/** It is recommended to set the following System properties before running this test: + * + * export TEST_HORIZONTAL_DESTINATIONS=500 + * export TEST_HORIZONTAL_MESSAGES=500 + * export TEST_HORIZONTAL_COMMIT_INTERVAL=100 + * export TEST_HORIZONTAL_SIZE=60000 + * + * #You may choose to use zip files to save some time on producing if you want to run this test over and over when debugging + * export TEST_HORIZONTAL_ZIP_LOCATION=a folder + * */ +@RunWith(Parameterized.class) +public class HorizontalPagingTest extends SoakTestBase { + + private static final String TEST_NAME = "HORIZONTAL"; + + private final String protocol; + private static final String ZIP_LOCATION = testProperty(TEST_NAME, "ZIP_LOCATION", null); + private static final int SERVER_START_TIMEOUT = testProperty(TEST_NAME, "SERVER_START_TIMEOUT", 300_000); + private static final int TIMEOUT_MINUTES = testProperty(TEST_NAME, "TIMEOUT_MINUTES", 120); + private static final String PROTOCOL_LIST = testProperty(TEST_NAME, "PROTOCOL_LIST", "OPENWIRE,CORE,AMQP"); + private static final int PRINT_INTERVAL = testProperty(TEST_NAME, "PRINT_INTERVAL", 100); + + private final int DESTINATIONS; + private final int MESSAGES; + private final int COMMIT_INTERVAL; + // if 0 will use AUTO_ACK + private final int RECEIVE_COMMIT_INTERVAL; + private final int MESSAGE_SIZE; + private final int PARALLEL_SENDS; + + + private static final Logger logger = Logger.getLogger(HorizontalPagingTest.class); + + public static final String SERVER_NAME_0 = "horizontalPaging"; + + @Parameterized.Parameters(name = "protocol={0}") + public static Collection parameters() { + String[] protocols = PROTOCOL_LIST.split(","); + + ArrayList parameters = new ArrayList<>(); + for (String str : protocols) { + logger.info("Adding " + str + " to the list for the test"); + parameters.add(new Object[]{str}); + } + + return parameters; + } + + public HorizontalPagingTest(String protocol) { + this.protocol = protocol; + DESTINATIONS = testProperty(TEST_NAME, protocol + "_DESTINATIONS", 10); + MESSAGES = testProperty(TEST_NAME, protocol + "_MESSAGES", 100); + COMMIT_INTERVAL = testProperty(TEST_NAME, protocol + "_COMMIT_INTERVAL", 10); + // if 0 will use AUTO_ACK + RECEIVE_COMMIT_INTERVAL = testProperty(TEST_NAME, protocol + "_RECEIVE_COMMIT_INTERVAL", 1); + MESSAGE_SIZE = testProperty(TEST_NAME, protocol + "_MESSAGE_SIZE", 60_000); + PARALLEL_SENDS = testProperty(TEST_NAME, protocol + "_PARALLEL_SENDS", 2); + } + + Process serverProcess; + + boolean unzipped = false; + + private String getZipName() { + return "data-" + protocol + "-" + DESTINATIONS + "-" + MESSAGES + "-" + MESSAGE_SIZE + ".zip"; + } + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + + boolean useZip = ZIP_LOCATION != null; + String zipName = getZipName(); + File zipFile = useZip ? new File(ZIP_LOCATION + "/" + zipName) : null; + + if (ZIP_LOCATION != null && zipFile.exists()) { + unzipped = true; + System.out.println("Invoking unzip"); + ProcessBuilder zipBuilder = new ProcessBuilder("unzip", zipFile.getAbsolutePath()).directory(new File(getServerLocation(SERVER_NAME_0))); + + Process process = zipBuilder.start(); + SpawnedVMSupport.startLogger("zip", process); + System.out.println("Zip finished with " + process.waitFor()); + } + + serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT); + } + + + @Test + public void testHorizontal() throws Exception { + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + AtomicInteger errors = new AtomicInteger(0); + + ExecutorService service = Executors.newFixedThreadPool(DESTINATIONS); + runAfter(service::shutdownNow); + + if (!unzipped) { + Connection connection = factory.createConnection(); + runAfter(connection::close); + + String text; + { + StringBuffer buffer = new StringBuffer(); + while (buffer.length() < MESSAGE_SIZE) { + buffer.append("a big string..."); + } + + text = buffer.toString(); + } + + ReusableLatch latchDone = new ReusableLatch(0); + + + for (int i = 0; i < DESTINATIONS; i++) { + latchDone.countUp(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("queue_" + i); + service.execute(() -> { + try { + logger.info("*******************************************************************************************************************************\ndestination " + queue.getQueueName()); + MessageProducer producer = session.createProducer(queue); + for (int m = 0; m < MESSAGES; m++) { + producer.send(session.createTextMessage(text)); + if (m > 0 && m % COMMIT_INTERVAL == 0) { + logger.info("Sent " + m + " " + protocol + " messages on queue " + queue.getQueueName()); + session.commit(); + } + } + + session.commit(); + session.close(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + latchDone.countDown(); + } + }); + + if ((i + 1) % PARALLEL_SENDS == 0) { + latchDone.await(); + } + } + latchDone.await(); + + connection.close(); + + + killServer(serverProcess); + } + + + if (ZIP_LOCATION != null && !unzipped) { + String fileName = getZipName(); + logger.info("Zipping data folder for " + protocol + " as " + fileName); + ProcessBuilder zipBuilder = new ProcessBuilder("zip", "-r", ZIP_LOCATION + "/" + getZipName(), "data").directory(new File(getServerLocation(SERVER_NAME_0))); + Process process = zipBuilder.start(); + SpawnedVMSupport.startLogger("zip", process); + System.out.println("Zip finished with " + process.waitFor()); + } + + serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT); + + Connection connectionConsumer = factory.createConnection(); + + runAfter(connectionConsumer::close); + + AtomicInteger completedFine = new AtomicInteger(0); + + for (int i = 0; i < DESTINATIONS; i++) { + int destination = i; + service.execute(() -> { + try { + Session sessionConsumer; + + if (RECEIVE_COMMIT_INTERVAL <= 0) { + sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE); + } else { + sessionConsumer = connectionConsumer.createSession(true, Session.SESSION_TRANSACTED); + } + + MessageConsumer messageConsumer = sessionConsumer.createConsumer(sessionConsumer.createQueue("queue_" + destination)); + for (int m = 0; m < MESSAGES; m++) { + TextMessage message = (TextMessage) messageConsumer.receive(50_000); + if (message == null) { + m--; + continue; + } + + // The sending commit interval here will be used for printing + if (PRINT_INTERVAL > 0 && m % PRINT_INTERVAL == 0) { + logger.info("Destination " + destination + " received " + m + " " + protocol + " messages"); + } + + if (RECEIVE_COMMIT_INTERVAL > 0 && (m + 1) % RECEIVE_COMMIT_INTERVAL == 0) { + sessionConsumer.commit(); + } + } + + if (RECEIVE_COMMIT_INTERVAL > 0) { + sessionConsumer.commit(); + } + + completedFine.incrementAndGet(); + + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } + }); + } + + connectionConsumer.start(); + + service.shutdown(); + Assert.assertTrue("Test Timed Out", service.awaitTermination(TIMEOUT_MINUTES, TimeUnit.MINUTES)); + Assert.assertEquals(0, errors.get()); + Assert.assertEquals(DESTINATIONS, completedFine.get()); + + connectionConsumer.close(); + } + +} diff --git a/tests/soak-tests/src/test/scripts/parameters-paging-short.sh b/tests/soak-tests/src/test/scripts/parameters-paging-short.sh new file mode 100755 index 0000000000..1c2774de74 --- /dev/null +++ b/tests/soak-tests/src/test/scripts/parameters-paging-short.sh @@ -0,0 +1,50 @@ +#!/bin/sh +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Setting the script to fail if anything goes wrong +set -e + +# this script contains a suggest set of variables to run the HorizontalPagingTest in a medium environment and hit some issues we used to have with paging + +# It is possible to save the producer's time. If you set this variable the test will reuse previously sent data by zip and unzipping the data folder +export TEST_HORIZONTAL_ZIP_LOCATION=/tmp + +export TEST_HORIZONTAL_SERVER_START_TIMEOUT=300000 +export TEST_HORIZONTAL_TIMEOUT_MINUTES=120 +export TEST_HORIZONTAL_PROTOCOL_LIST=OPENWIRE,CORE,AMQP + +export TEST_HORIZONTAL_CORE_DESTINATIONS=2 +export TEST_HORIZONTAL_CORE_MESSAGES=1000 +export TEST_HORIZONTAL_CORE_COMMIT_INTERVAL=100 +export TEST_HORIZONTAL_CORE_RECEIVE_COMMIT_INTERVAL=0 +export TEST_HORIZONTAL_CORE_MESSAGE_SIZE=20000 +export TEST_HORIZONTAL_CORE_PARALLEL_SENDS=10 + +export TEST_HORIZONTAL_AMQP_DESTINATIONS=2 +export TEST_HORIZONTAL_AMQP_MESSAGES=1000 +export TEST_HORIZONTAL_AMQP_COMMIT_INTERVAL=100 +export TEST_HORIZONTAL_AMQP_RECEIVE_COMMIT_INTERVAL=0 +export TEST_HORIZONTAL_AMQP_MESSAGE_SIZE=20000 +export TEST_HORIZONTAL_AMQP_PARALLEL_SENDS=10 + +export TEST_HORIZONTAL_OPENWIRE_DESTINATIONS=2 +export TEST_HORIZONTAL_OPENWIRE_MESSAGES=1000 +export TEST_HORIZONTAL_OPENWIRE_COMMIT_INTERVAL=100 +export TEST_HORIZONTAL_OPENWIRE_RECEIVE_COMMIT_INTERVAL=0 +export TEST_HORIZONTAL_OPENWIRE_MESSAGE_SIZE=20000 +export TEST_HORIZONTAL_OPENWIRE_PARALLEL_SENDS=10 diff --git a/tests/soak-tests/src/test/scripts/parameters-paging.sh b/tests/soak-tests/src/test/scripts/parameters-paging.sh new file mode 100755 index 0000000000..b2f2237223 --- /dev/null +++ b/tests/soak-tests/src/test/scripts/parameters-paging.sh @@ -0,0 +1,50 @@ +#!/bin/sh +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Setting the script to fail if anything goes wrong +set -e + +# this script contains a suggest set of variables to run the HorizontalPagingTest in a medium environment and hit some issues we used to have with paging + +# It is possible to save the producer's time. If you set this variable the test will reuse previously sent data by zip and unzipping the data folder +#export TEST_HORIZONTAL_ZIP_LOCATION=/place/to/my/zip + +export TEST_HORIZONTAL_SERVER_START_TIMEOUT=300000 +export TEST_HORIZONTAL_TIMEOUT_MINUTES=120 +export TEST_HORIZONTAL_PROTOCOL_LIST=OPENWIRE,CORE,AMQP + +export TEST_HORIZONTAL_CORE_DESTINATIONS=200 +export TEST_HORIZONTAL_CORE_MESSAGES=1000 +export TEST_HORIZONTAL_CORE_COMMIT_INTERVAL=100 +export TEST_HORIZONTAL_CORE_RECEIVE_COMMIT_INTERVAL=0 +export TEST_HORIZONTAL_CORE_MESSAGE_SIZE=20000 +export TEST_HORIZONTAL_CORE_PARALLEL_SENDS=10 + +export TEST_HORIZONTAL_AMQP_DESTINATIONS=200 +export TEST_HORIZONTAL_AMQP_MESSAGES=1000 +export TEST_HORIZONTAL_AMQP_COMMIT_INTERVAL=100 +export TEST_HORIZONTAL_AMQP_RECEIVE_COMMIT_INTERVAL=0 +export TEST_HORIZONTAL_AMQP_MESSAGE_SIZE=20000 +export TEST_HORIZONTAL_AMQP_PARALLEL_SENDS=10 + +export TEST_HORIZONTAL_OPENWIRE_DESTINATIONS=200 +export TEST_HORIZONTAL_OPENWIRE_MESSAGES=1000 +export TEST_HORIZONTAL_OPENWIRE_COMMIT_INTERVAL=100 +export TEST_HORIZONTAL_OPENWIRE_RECEIVE_COMMIT_INTERVAL=0 +export TEST_HORIZONTAL_OPENWIRE_MESSAGE_SIZE=20000 +export TEST_HORIZONTAL_OPENWIRE_PARALLEL_SENDS=10 \ No newline at end of file