This commit is contained in:
Clebert Suconic 2020-05-09 23:44:56 -04:00
commit 56f2654cbb
1 changed files with 36 additions and 17 deletions

View File

@ -27,15 +27,14 @@ import javax.jms.Session;
import javax.jms.Topic; import javax.jms.Topic;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.utils.RetryRule;
import org.apache.activemq.artemis.utils.SpawnedVMSupport; import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -43,9 +42,6 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class SoakPagingTest extends SmokeTestBase { public class SoakPagingTest extends SmokeTestBase {
@Rule
public RetryRule retryRule = new RetryRule(1);
public static final int LAG_CONSUMER_TIME = 1000; public static final int LAG_CONSUMER_TIME = 1000;
public static final int TIME_RUNNING = 4000; public static final int TIME_RUNNING = 4000;
public static final int CLIENT_KILLS = 2; public static final int CLIENT_KILLS = 2;
@ -91,7 +87,8 @@ public class SoakPagingTest extends SmokeTestBase {
static final int consumer_threads = 20; static final int consumer_threads = 20;
static final int producer_threads = 20; static final int producer_threads = 20;
static AtomicInteger j = new AtomicInteger(0); static AtomicInteger producer_count = new AtomicInteger(0);
static AtomicInteger consumer_count = new AtomicInteger(0);
private static ConnectionFactory createConnectionFactory(String protocol, String uri) { private static ConnectionFactory createConnectionFactory(String protocol, String uri) {
if (protocol.toUpperCase().equals("OPENWIRE")) { if (protocol.toUpperCase().equals("OPENWIRE")) {
@ -132,12 +129,15 @@ public class SoakPagingTest extends SmokeTestBase {
final ConnectionFactory factory = createConnectionFactory(protocol, "tcp://" + host + ":" + port); final ConnectionFactory factory = createConnectionFactory(protocol, "tcp://" + host + ":" + port);
CountDownLatch producersLatch = new CountDownLatch(producer_threads);
CountDownLatch consumersLatch = new CountDownLatch(consumer_threads);
for (int i = 0; i < producer_threads; i++) { for (int i = 0; i < producer_threads; i++) {
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx); SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx);
app.produce(factory); app.produce(factory, producer_count.incrementAndGet(), producersLatch);
} }
}); });
t.start(); t.start();
@ -150,16 +150,27 @@ public class SoakPagingTest extends SmokeTestBase {
@Override @Override
public void run() { public void run() {
SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx); SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx);
app.consume(factory, j.getAndIncrement()); app.consume(factory, consumer_count.getAndIncrement(), consumersLatch);
} }
}); });
t.start(); t.start();
} }
System.out.println("Awaiting producers...");
producersLatch.await();
System.out.println("Awaiting consumers...");
consumersLatch.await();
System.out.println("Awaiting timeout...");
Thread.sleep(time); Thread.sleep(time);
System.exit(consumed.get() > 0 ? 1 : 0); int exitStatus = consumed.get() > 0 ? 1 : 0;
} catch (Throwable e) { System.out.println("Exiting with the status: " + exitStatus);
e.printStackTrace(); System.exit(exitStatus);
} catch (Throwable t) {
System.err.println("Exiting with the status 0. Reason: " + t);
t.printStackTrace();
System.exit(0); System.exit(0);
} }
@ -178,7 +189,7 @@ public class SoakPagingTest extends SmokeTestBase {
} }
} }
public void produce(ConnectionFactory factory) { public void produce(ConnectionFactory factory, int index, CountDownLatch latch) {
try { try {
StringBuffer bufferlarge = new StringBuffer(); StringBuffer bufferlarge = new StringBuffer();
@ -187,7 +198,11 @@ public class SoakPagingTest extends SmokeTestBase {
} }
Connection connection = factory.createConnection("admin", "admin"); Connection connection = factory.createConnection("admin", "admin");
latch.countDown();
connection.start(); connection.start();
System.out.println("Producer" + index + " started");
final Session session; final Session session;
if (transaction) { if (transaction) {
@ -220,18 +235,19 @@ public class SoakPagingTest extends SmokeTestBase {
produced.incrementAndGet(); produced.incrementAndGet();
i++; i++;
if (i % 100 == 0) { if (i % 100 == 0) {
System.out.println("Published " + i + " messages"); System.out.println("Producer" + index + " published " + i + " messages");
if (transaction) { if (transaction) {
session.commit(); session.commit();
} }
} }
} }
} catch (Exception e) { } catch (Exception e) {
System.err.println("Error on Producer" + index + ": " + e.getMessage());
e.printStackTrace(); e.printStackTrace();
} }
} }
public void consume(ConnectionFactory factory, int j) { public void consume(ConnectionFactory factory, int index, CountDownLatch latch) {
try { try {
Connection connection = factory.createConnection("admin", "admin"); Connection connection = factory.createConnection("admin", "admin");
@ -251,7 +267,7 @@ public class SoakPagingTest extends SmokeTestBase {
address = session.createTopic(destination); address = session.createTopic(destination);
} }
String consumerId = "ss" + (j % 5); String consumerId = "ss" + (index % 5);
MessageConsumer messageConsumer; MessageConsumer messageConsumer;
if (protocol.equals("shared")) { if (protocol.equals("shared")) {
@ -262,23 +278,26 @@ public class SoakPagingTest extends SmokeTestBase {
if (LAG_CONSUMER_TIME > 0) Thread.sleep(LAG_CONSUMER_TIME); if (LAG_CONSUMER_TIME > 0) Thread.sleep(LAG_CONSUMER_TIME);
latch.countDown();
connection.start(); connection.start();
System.out.println("Consumer" + index + " started");
int i = 0; int i = 0;
while (true) { while (true) {
Message m = messageConsumer.receive(1000); Message m = messageConsumer.receive(1000);
consumed.incrementAndGet(); consumed.incrementAndGet();
if (m == null) if (m == null)
System.out.println("receive() returned null"); System.out.println("Consumer" + index + "received null");
i++; i++;
if (i % 100 == 0) { if (i % 100 == 0) {
System.out.println("Consumed " + i + " messages"); System.out.println("Consumer" + index + "received " + i + " messages");
if (transaction) { if (transaction) {
session.commit(); session.commit();
} }
} }
} }
} catch (Exception e) { } catch (Exception e) {
System.err.println("Error on Consumer" + index + ": " + e.getMessage());
e.printStackTrace(); e.printStackTrace();
} }
} }