diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 405adf2a54..82e88707f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -4327,15 +4327,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { MetricsManager metricsManager = server.getMetricsManager(); metricsManager.registerQueueGauge(addressName, queueName, (builder) -> { - builder.register(QueueMetricNames.MESSAGE_COUNT, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION); - builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION); - builder.register(QueueMetricNames.PERSISTENT_SIZE, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION); - builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION); + builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION); + builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION); - builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getMessageCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION); - builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurableMessageCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION); - builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getPersistentSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION); - builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurablePersistentSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION); + builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION); + builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION); builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION); builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MetricsPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MetricsPluginTest.java index cab34b22e4..26fbd2ba98 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MetricsPluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MetricsPluginTest.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Before; @@ -198,6 +199,36 @@ public class MetricsPluginTest extends ActiveMQTestBase { checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 0.0); } + @Test + public void testMessageCountWithPaging() throws Exception { + final String data = "Simple Text " + UUID.randomUUID().toString(); + final String queueName = "simpleQueue"; + final String addressName = "simpleAddress"; + + server.getAddressSettingsRepository().getMatch(addressName).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeBytes(1024 * 10).setPageSizeBytes(1024 * 5); + + session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST)); + ClientProducer producer = session.createProducer(addressName); + ClientMessage message = session.createMessage(true); + message.getBodyBuffer().writeString(data); + long messageCount = 0; + while (!server.getPagingManager().getPageStore(new SimpleString(addressName)).isPaging()) { + producer.send(message); + messageCount++; + } + + Wait.assertEquals(messageCount, server.locateQueue(queueName)::getMessageCount, 2000, 100); + checkMetric(getMetrics(), "artemis.message.count", "queue", queueName, Double.valueOf(messageCount)); + + for (int i = 0; i < messageCount; i++) { + producer.send(message); + } + producer.close(); + + Wait.assertEquals(messageCount * 2, server.locateQueue(queueName)::getMessageCount, 2000, 100); + checkMetric(getMetrics(), "artemis.message.count", "queue", queueName, Double.valueOf(messageCount * 2)); + } + public Map getMetrics() { Map metrics = new HashMap<>(); List meters = server.getMetricsManager().getMeterRegistry().getMeters();