diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index af172c8dad..9c051140e2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -20,6 +20,7 @@ import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; +import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -31,9 +32,13 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.Set; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.stream.Stream; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; @@ -1074,4 +1079,119 @@ public class ConsumerTest extends ActiveMQTestBase { session.close(); } + @Test + public void testMultipleConsumersOnSharedQueue() throws Throwable { + if (!isNetty() || this.durable) { + return; + } + final boolean durable = false; + final long TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1); + final int forks = 100; + final int queues = forks; + final int runs = 1; + final int messages = 1; + final ConnectionFactory factorySend = createFactory(1); + final AtomicLongArray receivedMessages = new AtomicLongArray(forks); + final Thread[] producersRunners = new Thread[forks]; + final Thread[] consumersRunners = new Thread[forks]; + //parties are forks (1 producer 1 consumer) + 1 controller in the main test thread + final CyclicBarrier onStartRun = new CyclicBarrier((forks * 2) + 1); + final CyclicBarrier onFinishRun = new CyclicBarrier((forks * 2) + 1); + + final int messagesSent = forks * messages; + final AtomicInteger messagesRecieved = new AtomicInteger(0); + + for (int i = 0; i < forks; i++) { + final int forkIndex = i; + final String queueName = "q_" + (forkIndex % queues); + final Thread producerRunner = new Thread(() -> { + try (Connection connection = factorySend.createConnection()) { + connection.start(); + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + final javax.jms.Queue queue = session.createQueue(queueName); + try (MessageProducer producer = session.createProducer(queue)) { + producer.setDeliveryMode(durable ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + for (int r = 0; r < runs; r++) { + onStartRun.await(); + for (int m = 0; m < messages; m++) { + final BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeInt(forkIndex); + producer.send(bytesMessage); + } + onFinishRun.await(); + } + } catch (InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); + } + } + } catch (JMSException e) { + e.printStackTrace(); + } + }); + + producerRunner.setDaemon(true); + + final Thread consumerRunner = new Thread(() -> { + try (Connection connection = factorySend.createConnection()) { + connection.start(); + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + final javax.jms.Queue queue = session.createQueue(queueName); + try (MessageConsumer consumer = session.createConsumer(queue)) { + for (int r = 0; r < runs; r++) { + onStartRun.await(); + while (messagesRecieved.get() != messagesSent) { + final BytesMessage receivedMessage = (BytesMessage) consumer.receive(1000); + if (receivedMessage != null) { + final int receivedConsumerIndex = receivedMessage.readInt(); + receivedMessages.getAndIncrement(receivedConsumerIndex); + messagesRecieved.incrementAndGet(); + } + } + onFinishRun.await(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + } catch (JMSException e) { + e.printStackTrace(); + } + }); + consumerRunner.setDaemon(true); + consumersRunners[forkIndex] = consumerRunner; + producersRunners[forkIndex] = producerRunner; + } + Stream.of(consumersRunners).forEach(Thread::start); + Stream.of(producersRunners).forEach(Thread::start); + final long messagesPerRun = (forks * messages); + for (int r = 0; r < runs; r++) { + onStartRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + System.out.println("started run " + r); + final long start = System.currentTimeMillis(); + onFinishRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + final long elapsedMillis = System.currentTimeMillis() - start; + System.out.println((messagesPerRun * 1000L) / elapsedMillis + " msg/sec"); + } + Stream.of(producersRunners).forEach(runner -> { + try { + runner.join(TIMEOUT_MILLIS * runs); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + Stream.of(producersRunners).forEach(Thread::interrupt); + Stream.of(consumersRunners).forEach(runner -> { + try { + runner.join(TIMEOUT_MILLIS * runs); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + Stream.of(consumersRunners).forEach(Thread::interrupt); + for (int i = 0; i < forks; i++) { + Assert.assertEquals("The consumer " + i + " must receive all the messages sent.", messages * runs, receivedMessages.get(i)); + } + } }