ARTEMIS-1495 Test simulating a dead lock on queue auto create under stress

This commit is contained in:
Francesco Nigro 2017-11-02 10:51:43 +01:00 committed by Clebert Suconic
parent c2a21c9743
commit 8bf879f156
1 changed files with 120 additions and 0 deletions
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client

View File

@ -20,6 +20,7 @@ import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage; import javax.jms.MapMessage;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
@ -31,9 +32,13 @@ import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@ -1074,4 +1079,119 @@ public class ConsumerTest extends ActiveMQTestBase {
session.close(); session.close();
} }
@Test
public void testMultipleConsumersOnSharedQueue() throws Throwable {
if (!isNetty() || this.durable) {
return;
}
final boolean durable = false;
final long TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
final int forks = 100;
final int queues = forks;
final int runs = 1;
final int messages = 1;
final ConnectionFactory factorySend = createFactory(1);
final AtomicLongArray receivedMessages = new AtomicLongArray(forks);
final Thread[] producersRunners = new Thread[forks];
final Thread[] consumersRunners = new Thread[forks];
//parties are forks (1 producer 1 consumer) + 1 controller in the main test thread
final CyclicBarrier onStartRun = new CyclicBarrier((forks * 2) + 1);
final CyclicBarrier onFinishRun = new CyclicBarrier((forks * 2) + 1);
final int messagesSent = forks * messages;
final AtomicInteger messagesRecieved = new AtomicInteger(0);
for (int i = 0; i < forks; i++) {
final int forkIndex = i;
final String queueName = "q_" + (forkIndex % queues);
final Thread producerRunner = new Thread(() -> {
try (Connection connection = factorySend.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
final javax.jms.Queue queue = session.createQueue(queueName);
try (MessageProducer producer = session.createProducer(queue)) {
producer.setDeliveryMode(durable ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
for (int r = 0; r < runs; r++) {
onStartRun.await();
for (int m = 0; m < messages; m++) {
final BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeInt(forkIndex);
producer.send(bytesMessage);
}
onFinishRun.await();
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
});
producerRunner.setDaemon(true);
final Thread consumerRunner = new Thread(() -> {
try (Connection connection = factorySend.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
final javax.jms.Queue queue = session.createQueue(queueName);
try (MessageConsumer consumer = session.createConsumer(queue)) {
for (int r = 0; r < runs; r++) {
onStartRun.await();
while (messagesRecieved.get() != messagesSent) {
final BytesMessage receivedMessage = (BytesMessage) consumer.receive(1000);
if (receivedMessage != null) {
final int receivedConsumerIndex = receivedMessage.readInt();
receivedMessages.getAndIncrement(receivedConsumerIndex);
messagesRecieved.incrementAndGet();
}
}
onFinishRun.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
});
consumerRunner.setDaemon(true);
consumersRunners[forkIndex] = consumerRunner;
producersRunners[forkIndex] = producerRunner;
}
Stream.of(consumersRunners).forEach(Thread::start);
Stream.of(producersRunners).forEach(Thread::start);
final long messagesPerRun = (forks * messages);
for (int r = 0; r < runs; r++) {
onStartRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
System.out.println("started run " + r);
final long start = System.currentTimeMillis();
onFinishRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
final long elapsedMillis = System.currentTimeMillis() - start;
System.out.println((messagesPerRun * 1000L) / elapsedMillis + " msg/sec");
}
Stream.of(producersRunners).forEach(runner -> {
try {
runner.join(TIMEOUT_MILLIS * runs);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Stream.of(producersRunners).forEach(Thread::interrupt);
Stream.of(consumersRunners).forEach(runner -> {
try {
runner.join(TIMEOUT_MILLIS * runs);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Stream.of(consumersRunners).forEach(Thread::interrupt);
for (int i = 0; i < forks; i++) {
Assert.assertEquals("The consumer " + i + " must receive all the messages sent.", messages * runs, receivedMessages.get(i));
}
}
} }