NO-JIRA Improving SoakPagingTest

This commit is contained in:
Clebert Suconic 2020-12-21 12:59:40 -05:00
parent 6d10724d05
commit e3670b64e5
2 changed files with 36 additions and 20 deletions

View File

@ -16,4 +16,4 @@
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
mvn -Ptests -DfailIfNoTests=false -Pextra-tests -DskipStyleCheck=true -DskipPerformanceTests=false -Dtest=$1 test mvn -Ptests -DfailIfNoTests=false -Ptests-retry -Ptests-CI -Pextra-tests -DskipStyleCheck=true -DskipPerformanceTests=false -Dtest=$1 test

View File

@ -52,12 +52,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.utils.RetryRule; import org.apache.activemq.artemis.utils.ExecuteUtil;
import org.apache.activemq.artemis.utils.SpawnedVMSupport; import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.QoS;
import org.jboss.logging.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -68,8 +69,7 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class SoakPagingTest extends SmokeTestBase { public class SoakPagingTest extends SmokeTestBase {
@Rule private static final Logger log = Logger.getLogger(SoakPagingTest.class);
public RetryRule retryRule = new RetryRule(1);
public static final int LAG_CONSUMER_TIME = 1000; public static final int LAG_CONSUMER_TIME = 1000;
public static final int TIME_RUNNING = 4000; public static final int TIME_RUNNING = 4000;
@ -125,13 +125,7 @@ public class SoakPagingTest extends SmokeTestBase {
} else if (protocol.toUpperCase().equals("MQTT")) { } else if (protocol.toUpperCase().equals("MQTT")) {
return new MQTTCF(); return new MQTTCF();
} else if (protocol.toUpperCase().equals("AMQP")) { } else if (protocol.toUpperCase().equals("AMQP")) {
return new JmsConnectionFactory("failover:(amqp://localhost:61616,amqp://localhost:61617)?failover.maxReconnectAttempts=16&jms.prefetchPolicy.all=5&jms.forceSyncSend=true");
if (uri.startsWith("tcp://")) {
// replacing tcp:// by amqp://
uri = "amqp" + uri.substring(3);
}
return new JmsConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) { } else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) {
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri); return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri);
} else { } else {
@ -187,23 +181,23 @@ public class SoakPagingTest extends SmokeTestBase {
t.start(); t.start();
} }
System.out.println("Awaiting producers..."); log.debug("Awaiting producers...");
if (!producersLatch.await(30000, TimeUnit.MILLISECONDS)) { if (!producersLatch.await(30000, TimeUnit.MILLISECONDS)) {
System.err.println("Awaiting producers timeout"); System.err.println("Awaiting producers timeout");
System.exit(0); System.exit(0);
} }
System.out.println("Awaiting consumers..."); log.debug("Awaiting consumers...");
if (!consumersLatch.await(30000, TimeUnit.MILLISECONDS)) { if (!consumersLatch.await(30000, TimeUnit.MILLISECONDS)) {
System.err.println("Awaiting consumers timeout"); System.err.println("Awaiting consumers timeout");
System.exit(0); System.exit(0);
} }
System.out.println("Awaiting timeout..."); log.debug("Awaiting timeout...");
Thread.sleep(time); Thread.sleep(time);
int exitStatus = consumed.get() > 0 ? 1 : 0; int exitStatus = consumed.get() > 0 ? 1 : 0;
System.out.println("Exiting with the status: " + exitStatus); log.debug("Exiting with the status: " + exitStatus);
System.exit(exitStatus); System.exit(exitStatus);
} catch (Throwable t) { } catch (Throwable t) {
System.err.println("Exiting with the status 0. Reason: " + t); System.err.println("Exiting with the status 0. Reason: " + t);
@ -222,10 +216,32 @@ public class SoakPagingTest extends SmokeTestBase {
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "" + TIME_RUNNING, "" + transaction); Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "" + TIME_RUNNING, "" + transaction);
int result = process.waitFor(); int result = process.waitFor();
if (result <= 0) {
jstack();
}
Assert.assertTrue(result > 0); Assert.assertTrue(result > 0);
} }
} }
protected void jstack() throws Exception {
try {
System.out.println("*******************************************************************************************************************************");
System.out.println("SERVER 0 jstack");
System.out.println("*******************************************************************************************************************************");
ExecuteUtil.runCommand(true, 1, TimeUnit.MINUTES, "jstack", "" + ExecuteUtil.getPID(server0));
} catch (Throwable e) {
log.warn("Error executing jstack on Server 0", e);
}
try {
System.out.println("*******************************************************************************************************************************");
System.out.println("SERVER 1 jstack");
System.out.println("*******************************************************************************************************************************");
ExecuteUtil.runCommand(true, 1, TimeUnit.MINUTES, "jstack", "" + ExecuteUtil.getPID(server1));
} catch (Throwable e) {
log.warn("Error executing jstack on Server 1", e);
}
}
public void produce(ConnectionFactory factory, int index, CountDownLatch latch) { public void produce(ConnectionFactory factory, int index, CountDownLatch latch) {
try { try {
@ -238,7 +254,7 @@ public class SoakPagingTest extends SmokeTestBase {
latch.countDown(); latch.countDown();
connection.start(); connection.start();
System.out.println("Producer" + index + " started"); log.debug("Producer" + index + " started");
final Session session; final Session session;
@ -272,7 +288,7 @@ public class SoakPagingTest extends SmokeTestBase {
produced.incrementAndGet(); produced.incrementAndGet();
i++; i++;
if (i % 100 == 0) { if (i % 100 == 0) {
System.out.println("Producer" + index + " published " + i + " messages"); log.debug("Producer" + index + " published " + i + " messages");
if (transaction) { if (transaction) {
session.commit(); session.commit();
} }
@ -317,17 +333,17 @@ public class SoakPagingTest extends SmokeTestBase {
latch.countDown(); latch.countDown();
connection.start(); connection.start();
System.out.println("Consumer" + index + " started"); log.debug("Consumer" + index + " started");
int i = 0; int i = 0;
while (true) { while (true) {
Message m = messageConsumer.receive(1000); Message m = messageConsumer.receive(1000);
consumed.incrementAndGet(); consumed.incrementAndGet();
if (m == null) if (m == null)
System.out.println("Consumer" + index + "received null"); log.debug("Consumer" + index + "received null");
i++; i++;
if (i % 100 == 0) { if (i % 100 == 0) {
System.out.println("Consumer" + index + "received " + i + " messages"); log.debug("Consumer" + index + "received " + i + " messages");
if (transaction) { if (transaction) {
session.commit(); session.commit();
} }