diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 35dd5ed47a..1f1f4fdd23 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -177,13 +177,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final ScheduledDeliveryHandler scheduledDeliveryHandler; - private long messagesAdded; + private AtomicLong messagesAdded = new AtomicLong(0); - private long messagesAcknowledged; + private AtomicLong messagesAcknowledged = new AtomicLong(0); - private long messagesExpired; + private AtomicLong messagesExpired = new AtomicLong(0); - private long messagesKilled; + private AtomicLong messagesKilled = new AtomicLong(0); protected final AtomicInteger deliveringCount = new AtomicInteger(0); @@ -637,7 +637,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { directDeliver = false; if (!ref.isPaged()) { - messagesAdded++; + messagesAdded.incrementAndGet(); } } @@ -702,7 +702,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) { synchronized (this) { if (!ref.isPaged()) { - messagesAdded++; + messagesAdded.incrementAndGet(); } } @@ -1132,11 +1132,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (reason == AckReason.EXPIRED) { - messagesExpired++; + messagesExpired.incrementAndGet(); } else if (reason == AckReason.KILLED) { - messagesKilled++; + messagesKilled.incrementAndGet(); } else { - messagesAcknowledged++; + messagesAcknowledged.incrementAndGet(); } if (server != null) { @@ -1170,11 +1170,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (reason == AckReason.EXPIRED) { - messagesExpired++; + messagesExpired.incrementAndGet(); } else if (reason == AckReason.KILLED) { - messagesKilled++; + messagesKilled.incrementAndGet(); } else { - messagesAcknowledged++; + messagesAcknowledged.incrementAndGet(); } if (server != null) { @@ -1195,7 +1195,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // https://issues.jboss.org/browse/HORNETQ-609 incDelivering(); - messagesAcknowledged++; + messagesAcknowledged.incrementAndGet(); } private RefsOperation getRefsOperation(final Transaction tx) { @@ -1314,7 +1314,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void incrementMesssagesAdded() { - messagesAdded++; + messagesAdded.incrementAndGet(); } @Override @@ -1332,25 +1332,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public long getMessagesAdded() { if (pageSubscription != null) { - return messagesAdded + pageSubscription.getCounter().getValueAdded(); + return messagesAdded.get() + pageSubscription.getCounter().getValueAdded(); } else { - return messagesAdded; + return messagesAdded.get(); } } @Override public long getMessagesAcknowledged() { - return messagesAcknowledged; + return messagesAcknowledged.get(); } @Override public long getMessagesExpired() { - return messagesExpired; + return messagesExpired.get(); } @Override public long getMessagesKilled() { - return messagesKilled; + return messagesKilled.get(); } @Override @@ -2057,7 +2057,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { internalAddTail(ref); if (!ref.isPaged()) { - messagesAdded++; + messagesAdded.incrementAndGet(); } if (added++ > MAX_DELIVERIES_IN_LOOP) { @@ -2734,7 +2734,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { groups.put(groupID, consumer); } - messagesAdded++; + messagesAdded.incrementAndGet(); deliveriesInTransit.countUp(); proceedDeliver(consumer, ref); @@ -2911,22 +2911,22 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public synchronized void resetMessagesAdded() { - messagesAdded = 0; + messagesAdded.set(0); } @Override public synchronized void resetMessagesAcknowledged() { - messagesAcknowledged = 0; + messagesAcknowledged.set(0); } @Override public synchronized void resetMessagesExpired() { - messagesExpired = 0; + messagesExpired.set(0); } @Override public synchronized void resetMessagesKilled() { - messagesKilled = 0; + messagesKilled.set(0); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java index 6647c43b84..ac09745e55 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java @@ -20,6 +20,7 @@ import javax.management.MBeanServer; import javax.management.MBeanServerFactory; import javax.management.ObjectName; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -104,6 +105,14 @@ public abstract class ManagementTestBase extends ActiveMQTestBase { return queueControl; } + protected QueueControl createManagementControl(final SimpleString address, + final SimpleString queue, + final RoutingType routingType) throws Exception { + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, routingType, mbeanServer); + + return queueControl; + } + protected long getMessageCount(QueueControl control) throws Exception { control.flushExecutor(); return control.getMessageCount(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 267549f644..404f466959 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -24,11 +24,14 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -48,7 +51,6 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManage import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; import org.apache.activemq.artemis.utils.Base64; @@ -605,6 +607,81 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testMessagesAddedAndMessagesAcknowledged() throws Exception { + final int THREAD_COUNT = 5; + final int MSG_COUNT = 1000; + + CountDownLatch producerCountDown = new CountDownLatch(THREAD_COUNT); + CountDownLatch consumerCountDown = new CountDownLatch(THREAD_COUNT); + + ExecutorService producerExecutor = Executors.newFixedThreadPool(THREAD_COUNT); + ExecutorService consumerExecutor = Executors.newFixedThreadPool(THREAD_COUNT); + + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + try { + session.createQueue(address, RoutingType.ANYCAST, queue, null, false); + + for (int i = 0; i < THREAD_COUNT; i++) { + producerExecutor.submit(() -> { + try (ClientSessionFactory sf = locator.createSessionFactory(); ClientSession session = sf.createSession(false, true, false); ClientProducer producer = session.createProducer(address)) { + for (int j = 0; j < MSG_COUNT; j++) { + producer.send(session.createMessage(false)); + Thread.sleep(5); + } + producerCountDown.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + + for (int i = 0; i < THREAD_COUNT; i++) { + consumerExecutor.submit(() -> { + try (ClientSessionFactory sf = locator.createSessionFactory(); ClientSession session = sf.createSession(false, true, false); ClientConsumer consumer = session.createConsumer(queue)) { + session.start(); + for (int j = 0; j < MSG_COUNT; j++) { + ClientMessage message = consumer.receive(500); + Assert.assertNotNull(message); + message.acknowledge(); + } + session.commit(); + consumerCountDown.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + + producerCountDown.await(30, TimeUnit.SECONDS); + consumerCountDown.await(30, TimeUnit.SECONDS); + + QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST); + Assert.assertEquals(0, queueControl.getMessageCount()); + Assert.assertEquals(0, queueControl.getConsumerCount()); + Assert.assertEquals(0, queueControl.getDeliveringCount()); + Assert.assertEquals(THREAD_COUNT * MSG_COUNT, queueControl.getMessagesAdded()); + Assert.assertEquals(THREAD_COUNT * MSG_COUNT, queueControl.getMessagesAcknowledged()); + + session.deleteQueue(queue); + } finally { + shutdownExecutor(producerExecutor); + shutdownExecutor(consumerExecutor); + } + } + + private void shutdownExecutor(ExecutorService executor) { + try { + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } finally { + executor.shutdownNow(); + } + } + @Test public void testListMessagesAsJSONWithNullFilter() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); @@ -2153,14 +2230,6 @@ public class QueueControlTest extends ManagementTestBase { session.start(); } - @Override - protected QueueControl createManagementControl(final SimpleString address, - final SimpleString queue) throws Exception { - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, mbeanServer); - - return queueControl; - } - protected long getFirstMessageId(final QueueControl queueControl) throws Exception { JsonArray array = JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON()); JsonObject object = (JsonObject) array.get(0);