diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java index 521cce61f1..b09388b32f 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java @@ -27,15 +27,14 @@ import javax.jms.Session; import javax.jms.Topic; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; -import org.apache.activemq.artemis.utils.RetryRule; import org.apache.activemq.artemis.utils.SpawnedVMSupport; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -43,9 +42,6 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) public class SoakPagingTest extends SmokeTestBase { - @Rule - public RetryRule retryRule = new RetryRule(1); - public static final int LAG_CONSUMER_TIME = 1000; public static final int TIME_RUNNING = 4000; public static final int CLIENT_KILLS = 2; @@ -91,7 +87,8 @@ public class SoakPagingTest extends SmokeTestBase { static final int consumer_threads = 20; static final int producer_threads = 20; - static AtomicInteger j = new AtomicInteger(0); + static AtomicInteger producer_count = new AtomicInteger(0); + static AtomicInteger consumer_count = new AtomicInteger(0); private static ConnectionFactory createConnectionFactory(String protocol, String uri) { if (protocol.toUpperCase().equals("OPENWIRE")) { @@ -132,12 +129,15 @@ public class SoakPagingTest extends SmokeTestBase { final ConnectionFactory factory = createConnectionFactory(protocol, "tcp://" + host + ":" + port); + CountDownLatch producersLatch = new CountDownLatch(producer_threads); + CountDownLatch consumersLatch = new CountDownLatch(consumer_threads); + for (int i = 0; i < producer_threads; i++) { Thread t = new Thread(new Runnable() { @Override public void run() { SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx); - app.produce(factory); + app.produce(factory, producer_count.incrementAndGet(), producersLatch); } }); t.start(); @@ -150,16 +150,27 @@ public class SoakPagingTest extends SmokeTestBase { @Override public void run() { SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx); - app.consume(factory, j.getAndIncrement()); + app.consume(factory, consumer_count.getAndIncrement(), consumersLatch); } }); t.start(); } + + System.out.println("Awaiting producers..."); + producersLatch.await(); + + System.out.println("Awaiting consumers..."); + consumersLatch.await(); + + System.out.println("Awaiting timeout..."); Thread.sleep(time); - System.exit(consumed.get() > 0 ? 1 : 0); - } catch (Throwable e) { - e.printStackTrace(); + int exitStatus = consumed.get() > 0 ? 1 : 0; + System.out.println("Exiting with the status: " + exitStatus); + System.exit(exitStatus); + } catch (Throwable t) { + System.err.println("Exiting with the status 0. Reason: " + t); + t.printStackTrace(); System.exit(0); } @@ -178,7 +189,7 @@ public class SoakPagingTest extends SmokeTestBase { } } - public void produce(ConnectionFactory factory) { + public void produce(ConnectionFactory factory, int index, CountDownLatch latch) { try { StringBuffer bufferlarge = new StringBuffer(); @@ -187,7 +198,11 @@ public class SoakPagingTest extends SmokeTestBase { } Connection connection = factory.createConnection("admin", "admin"); + latch.countDown(); + connection.start(); + System.out.println("Producer" + index + " started"); + final Session session; if (transaction) { @@ -220,18 +235,19 @@ public class SoakPagingTest extends SmokeTestBase { produced.incrementAndGet(); i++; if (i % 100 == 0) { - System.out.println("Published " + i + " messages"); + System.out.println("Producer" + index + " published " + i + " messages"); if (transaction) { session.commit(); } } } } catch (Exception e) { + System.err.println("Error on Producer" + index + ": " + e.getMessage()); e.printStackTrace(); } } - public void consume(ConnectionFactory factory, int j) { + public void consume(ConnectionFactory factory, int index, CountDownLatch latch) { try { Connection connection = factory.createConnection("admin", "admin"); @@ -251,7 +267,7 @@ public class SoakPagingTest extends SmokeTestBase { address = session.createTopic(destination); } - String consumerId = "ss" + (j % 5); + String consumerId = "ss" + (index % 5); MessageConsumer messageConsumer; if (protocol.equals("shared")) { @@ -262,23 +278,26 @@ public class SoakPagingTest extends SmokeTestBase { if (LAG_CONSUMER_TIME > 0) Thread.sleep(LAG_CONSUMER_TIME); + latch.countDown(); connection.start(); + System.out.println("Consumer" + index + " started"); int i = 0; while (true) { Message m = messageConsumer.receive(1000); consumed.incrementAndGet(); if (m == null) - System.out.println("receive() returned null"); + System.out.println("Consumer" + index + "received null"); i++; if (i % 100 == 0) { - System.out.println("Consumed " + i + " messages"); + System.out.println("Consumer" + index + "received " + i + " messages"); if (transaction) { session.commit(); } } } } catch (Exception e) { + System.err.println("Error on Consumer" + index + ": " + e.getMessage()); e.printStackTrace(); } }