This commit is contained in:
Clebert Suconic 2019-07-22 09:33:21 -04:00
commit b86b86c40f
4 changed files with 125 additions and 38 deletions

View File

@ -3872,7 +3872,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (server != null && server.getMetricsManager() != null) {
MetricsManager metricsManager = server.getMetricsManager();
metricsManager.registerQueueGauge(queueName, addressName, (builder) -> {
metricsManager.registerQueueGauge(addressName, queueName, (builder) -> {
builder.register(QueueMetricNames.MESSAGE_COUNT, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.PERSISTENT_SIZE, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
@ -3880,7 +3880,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getMessageCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurableMessageCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurableMessageCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getPersistentSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurablePersistentSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);

View File

@ -32,7 +32,7 @@ public class QueueMetricNames {
public static final String SCHEDULED_MESSAGE_COUNT = "scheduled.message.count";
public static final String SCHEDULED_DURABLE_MESSAGE_COUNT = "scheduled.durable.message.count";
public static final String SCHEDULED_PERSISTENT_SIZE = "scheduled.persistent.size";
public static final String SCHEDULED_DURABLE_PERSISTENT_SIZE = "scheduled_durable.persistent.size";
public static final String SCHEDULED_DURABLE_PERSISTENT_SIZE = "scheduled.durable.persistent.size";
public static final String MESSAGES_ACKNOWLEDGED = "messages.acknowledged";
public static final String MESSAGES_ADDED = "messages.added";

View File

@ -169,6 +169,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>

View File

@ -20,7 +20,10 @@ package org.apache.activemq.artemis.tests.integration.plugin;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.UUID;
import java.util.stream.Collectors;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Meter;
@ -39,6 +42,8 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.containsInAnyOrder;
public class MetricsPluginTest extends ActiveMQTestBase {
protected ActiveMQServer server;
@ -58,6 +63,86 @@ public class MetricsPluginTest extends ActiveMQTestBase {
session = addClientSession(sf.createSession(false, true, true));
}
@Test
public void testForArtemisMetricsPresence() throws Exception {
class Metric {
public final String name;
public final String description;
public final Double value;
private Metric(String name, String description, Double value) {
this.name = name;
this.description = description;
this.value = value;
}
@Override
public String toString() {
return name + ": " + value + " (" + description + ")";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Metric metric = (Metric) o;
return Objects.equals(name, metric.name) &&
Objects.equals(description, metric.description) &&
Objects.equals(value, metric.value);
}
@Override
public int hashCode() {
return Objects.hash(name, description, value);
}
}
final String queueName = "simpleQueue";
final String addressName = "simpleAddress";
session.createQueue(addressName, RoutingType.ANYCAST, queueName, null, true);
Map<Meter.Id, Double> metrics = getMetrics();
List<Metric> artemisMetrics = metrics.entrySet().stream()
.map(entry -> new Metric(
entry.getKey().getName(),
entry.getKey().getDescription(),
entry.getValue()))
.filter(metric -> metric.name.startsWith("artemis"))
.collect(Collectors.toList());
// for (Metric metric : artemisMetrics) {
// IntegrationTestLogger.LOGGER.info(metric);
// }
assertThat(artemisMetrics, containsInAnyOrder(
// artemis.(un)routed.message.count is present twice, because of activemq.notifications address
new Metric("artemis.address.memory.usage", "Memory used by all the addresses on broker for in-memory messages", 0.0),
new Metric("artemis.connection.count", "Number of clients connected to this server", 1.0),
new Metric("artemis.consumer.count", "number of consumers consuming messages from this queue", 0.0),
new Metric("artemis.delivering.durable.message.count", "number of durable messages that this queue is currently delivering to its consumers", 0.0),
new Metric("artemis.delivering.durable.persistent.size", "persistent size of durable messages that this queue is currently delivering to its consumers", 0.0),
new Metric("artemis.delivering.message.count", "number of messages that this queue is currently delivering to its consumers", 0.0),
new Metric("artemis.delivering.persistent_size", "persistent size of messages that this queue is currently delivering to its consumers", 0.0),
new Metric("artemis.durable.message.count", "number of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)", 0.0),
new Metric("artemis.durable.persistent.size", "persistent size of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)", 0.0),
new Metric("artemis.message.count", "number of messages currently in this queue (includes scheduled, paged, and in-delivery messages)", 0.0),
new Metric("artemis.messages.acknowledged", "number of messages acknowledged from this queue since it was created", 0.0),
new Metric("artemis.messages.added", "number of messages added to this queue since it was created", 0.0),
new Metric("artemis.messages.expired", "number of messages expired from this queue since it was created", 0.0),
new Metric("artemis.messages.killed", "number of messages removed from this queue since it was created due to exceeding the max delivery attempts", 0.0),
new Metric("artemis.persistent.size", "persistent size of all messages (including durable and non-durable) currently in this queue (includes scheduled, paged, and in-delivery messages)", 0.0),
new Metric("artemis.routed.message.count", "number of messages routed to one or more bindings", 0.0),
new Metric("artemis.routed.message.count", "number of messages routed to one or more bindings", 0.0),
new Metric("artemis.scheduled.durable.message.count", "number of durable scheduled messages in this queue", 0.0),
new Metric("artemis.scheduled.durable.persistent.size", "persistent size of durable scheduled messages in this queue", 0.0),
new Metric("artemis.scheduled.message.count", "number of scheduled messages in this queue", 0.0),
new Metric("artemis.scheduled.persistent.size", "persistent size of scheduled messages in this queue", 0.0),
new Metric("artemis.total.connection.count", "Number of clients which have connected to this server since it was started", 1.0),
new Metric("artemis.unrouted.message.count", "number of messages not routed to any bindings", 0.0),
new Metric("artemis.unrouted.message.count", "number of messages not routed to any bindings", 2.0)
));
}
@Test
public void testForBasicMetricsPresenceAndValue() throws Exception {
final String data = "Simple Text " + UUID.randomUUID().toString();
@ -71,16 +156,16 @@ public class MetricsPluginTest extends ActiveMQTestBase {
producer.send(message);
producer.close();
Map<String, Double> metrics = getMetrics();
Map<Meter.Id, Double> metrics = getMetrics();
checkMetric(metrics, "'artemis.message.count'", queueName, 1.0);
checkMetric(metrics, "'artemis.messages.added'", queueName, 1.0);
checkMetric(metrics, "'artemis.messages.acknowledged'", queueName, 0.0);
checkMetric(metrics, "'artemis.durable.message.count'", queueName, 1.0);
checkMetric(metrics, "'artemis.delivering.message.count'", queueName, 0.0);
checkMetric(metrics, "'artemis.routed.message.count'", addressName, 1.0);
checkMetric(metrics, "'artemis.unrouted.message.count'", addressName, 0.0);
checkMetric(metrics, "'artemis.consumer.count'", queueName, 0.0);
checkMetric(metrics, "artemis.message.count", "queue", queueName, 1.0);
checkMetric(metrics, "artemis.messages.added", "queue", queueName, 1.0);
checkMetric(metrics, "artemis.messages.acknowledged", "queue", queueName, 0.0);
checkMetric(metrics, "artemis.durable.message.count", "queue", queueName, 1.0);
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 0.0);
checkMetric(metrics, "artemis.routed.message.count", "address", addressName, 1.0);
checkMetric(metrics, "artemis.unrouted.message.count", "address", addressName, 0.0);
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 0.0);
ClientConsumer consumer = session.createConsumer(queueName);
session.start();
@ -88,8 +173,8 @@ public class MetricsPluginTest extends ActiveMQTestBase {
assertNotNull(message);
metrics = getMetrics();
checkMetric(metrics, "'artemis.delivering.message.count'", queueName, 1.0);
checkMetric(metrics, "'artemis.consumer.count'", queueName, 1.0);
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 1.0);
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 1.0);
message.acknowledge();
assertEquals(data, message.getBodyBuffer().readString());
@ -101,41 +186,37 @@ public class MetricsPluginTest extends ActiveMQTestBase {
metrics = getMetrics();
checkMetric(metrics, "'artemis.message.count'", queueName, 0.0);
checkMetric(metrics, "'artemis.messages.added'", queueName, 1.0);
checkMetric(metrics, "'artemis.messages.acknowledged'", queueName, 1.0);
checkMetric(metrics, "'artemis.durable.message.count'", queueName, 0.0);
checkMetric(metrics, "'artemis.delivering.message.count'", queueName, 0.0);
checkMetric(metrics, "'artemis.routed.message.count'", addressName, 1.0);
checkMetric(metrics, "'artemis.unrouted.message.count'", addressName, 0.0);
checkMetric(metrics, "'artemis.consumer.count'", queueName, 0.0);
checkMetric(metrics, "artemis.message.count", "queue", queueName, 0.0);
checkMetric(metrics, "artemis.messages.added", "queue", queueName, 1.0);
checkMetric(metrics, "artemis.messages.acknowledged", "queue", queueName, 1.0);
checkMetric(metrics, "artemis.durable.message.count", "queue", queueName, 0.0);
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 0.0);
checkMetric(metrics, "artemis.routed.message.count", "address", addressName, 1.0);
checkMetric(metrics, "artemis.unrouted.message.count", "address", addressName, 0.0);
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 0.0);
}
public Map<String, Double> getMetrics() {
Map<String, Double> metrics = new HashMap<>();
public Map<Meter.Id, Double> getMetrics() {
Map<Meter.Id, Double> metrics = new HashMap<>();
List<Meter> meters = server.getMetricsManager().getMeterRegistry().getMeters();
assertTrue(meters.size() > 0);
for (Meter meter : meters) {
Iterable<Measurement> measurements = meter.measure();
for (Measurement measurement : measurements) {
metrics.put(meter.getId().toString(), measurement.getValue());
// if (meter.getId().getName().startsWith("artemis")) {
// IntegrationTestLogger.LOGGER.info(meter.getId().toString() + ": " + measurement.getValue() + " (" + meter.getId().getDescription() + ")");
// }
metrics.put(meter.getId(), measurement.getValue());
}
}
return metrics;
}
public void checkMetric(Map<String, Double> metrics, String metric, String tag, Double value) {
boolean contains = false;
for (Map.Entry<String, Double> entry : metrics.entrySet()) {
if (entry.getKey().contains(metric) && entry.getKey().contains(tag)) {
contains = true;
assertEquals(metric + " not equal", value, entry.getValue(), 0);
break;
}
}
assertTrue(contains);
public void checkMetric(Map<Meter.Id, Double> metrics, String metric, String tag, String tagValue, Double expectedValue) {
OptionalDouble actualValue = metrics.entrySet().stream()
.filter(entry -> metric.equals(entry.getKey().getName()))
.filter(entry -> tagValue.equals(entry.getKey().getTag(tag)))
.mapToDouble(Map.Entry::getValue)
.findFirst();
assertTrue(metric + " for " + tag + " " + tagValue + " not present", actualValue.isPresent());
assertEquals(metric + " not equal", expectedValue, actualValue.getAsDouble(), 0);
}
}