This closes #3109
This commit is contained in:
commit
279d31a31c
|
@ -4327,15 +4327,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
MetricsManager metricsManager = server.getMetricsManager();
|
MetricsManager metricsManager = server.getMetricsManager();
|
||||||
|
|
||||||
metricsManager.registerQueueGauge(addressName, queueName, (builder) -> {
|
metricsManager.registerQueueGauge(addressName, queueName, (builder) -> {
|
||||||
builder.register(QueueMetricNames.MESSAGE_COUNT, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
|
builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
|
||||||
builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
|
builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
|
||||||
builder.register(QueueMetricNames.PERSISTENT_SIZE, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
|
builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
|
||||||
builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
|
builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
|
||||||
|
|
||||||
builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getMessageCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
|
builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
|
||||||
builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurableMessageCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
|
builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
|
||||||
builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getPersistentSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
|
builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
|
||||||
builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurablePersistentSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
|
builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
|
||||||
|
|
||||||
builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
|
builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
|
||||||
builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
|
builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
|
import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.tests.util.Wait;
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -198,6 +199,36 @@ public class MetricsPluginTest extends ActiveMQTestBase {
|
||||||
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 0.0);
|
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 0.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMessageCountWithPaging() throws Exception {
|
||||||
|
final String data = "Simple Text " + UUID.randomUUID().toString();
|
||||||
|
final String queueName = "simpleQueue";
|
||||||
|
final String addressName = "simpleAddress";
|
||||||
|
|
||||||
|
server.getAddressSettingsRepository().getMatch(addressName).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeBytes(1024 * 10).setPageSizeBytes(1024 * 5);
|
||||||
|
|
||||||
|
session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST));
|
||||||
|
ClientProducer producer = session.createProducer(addressName);
|
||||||
|
ClientMessage message = session.createMessage(true);
|
||||||
|
message.getBodyBuffer().writeString(data);
|
||||||
|
long messageCount = 0;
|
||||||
|
while (!server.getPagingManager().getPageStore(new SimpleString(addressName)).isPaging()) {
|
||||||
|
producer.send(message);
|
||||||
|
messageCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.assertEquals(messageCount, server.locateQueue(queueName)::getMessageCount, 2000, 100);
|
||||||
|
checkMetric(getMetrics(), "artemis.message.count", "queue", queueName, Double.valueOf(messageCount));
|
||||||
|
|
||||||
|
for (int i = 0; i < messageCount; i++) {
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
producer.close();
|
||||||
|
|
||||||
|
Wait.assertEquals(messageCount * 2, server.locateQueue(queueName)::getMessageCount, 2000, 100);
|
||||||
|
checkMetric(getMetrics(), "artemis.message.count", "queue", queueName, Double.valueOf(messageCount * 2));
|
||||||
|
}
|
||||||
|
|
||||||
public Map<Meter.Id, Double> getMetrics() {
|
public Map<Meter.Id, Double> getMetrics() {
|
||||||
Map<Meter.Id, Double> metrics = new HashMap<>();
|
Map<Meter.Id, Double> metrics = new HashMap<>();
|
||||||
List<Meter> meters = server.getMetricsManager().getMeterRegistry().getMeters();
|
List<Meter> meters = server.getMetricsManager().getMeterRegistry().getMeters();
|
||||||
|
|
Loading…
Reference in New Issue