This closes #3131
This commit is contained in:
commit
5a4057dbc0
|
@ -291,6 +291,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String RETROACTIVE_MESSAGE_COUNT = "retroactive-message-count";
|
||||
|
||||
private static final String ENABLE_METRICS = "enable-metrics";
|
||||
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
||||
|
@ -1215,6 +1217,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
addressSettings.setExpiryQueuePrefix(new SimpleString(getTrimmedTextContent(child)));
|
||||
} else if (EXPIRY_QUEUE_SUFFIX_NODE_NAME.equalsIgnoreCase(name)) {
|
||||
addressSettings.setExpiryQueueSuffix(new SimpleString(getTrimmedTextContent(child)));
|
||||
} else if (ENABLE_METRICS.equalsIgnoreCase(name)) {
|
||||
addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child));
|
||||
}
|
||||
}
|
||||
return setting;
|
||||
|
|
|
@ -2837,7 +2837,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
* are not required to be included in the OSGi bundle and the Micrometer jars apparently don't support OSGi.
|
||||
*/
|
||||
if (configuration.getMetricsPlugin() != null) {
|
||||
metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsPlugin());
|
||||
metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsPlugin(), addressSettingsRepository);
|
||||
}
|
||||
|
||||
registerMeters();
|
||||
|
|
|
@ -31,6 +31,8 @@ import io.micrometer.core.instrument.Metrics;
|
|||
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
||||
public class MetricsManager {
|
||||
|
||||
|
@ -40,11 +42,16 @@ public class MetricsManager {
|
|||
|
||||
private final Map<String, List<Meter>> meters = new ConcurrentHashMap<>();
|
||||
|
||||
public MetricsManager(String brokerName, ActiveMQMetricsPlugin metricsPlugin) {
|
||||
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
|
||||
|
||||
public MetricsManager(String brokerName,
|
||||
ActiveMQMetricsPlugin metricsPlugin,
|
||||
HierarchicalRepository<AddressSettings> addressSettingsRepository) {
|
||||
this.brokerName = brokerName;
|
||||
meterRegistry = metricsPlugin.getRegistry();
|
||||
Metrics.globalRegistry.add(meterRegistry);
|
||||
new JvmMemoryMetrics().bindTo(meterRegistry);
|
||||
this.addressSettingsRepository = addressSettingsRepository;
|
||||
}
|
||||
|
||||
public MeterRegistry getMeterRegistry() {
|
||||
|
@ -59,7 +66,7 @@ public class MetricsManager {
|
|||
|
||||
public void registerQueueGauge(String address, String queue, Consumer<MetricGaugeBuilder> builder) {
|
||||
final MeterRegistry meterRegistry = this.meterRegistry;
|
||||
if (meterRegistry == null) {
|
||||
if (meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
|
||||
return;
|
||||
}
|
||||
final List<Gauge.Builder> newMeters = new ArrayList<>();
|
||||
|
@ -85,7 +92,7 @@ public class MetricsManager {
|
|||
|
||||
public void registerAddressGauge(String address, Consumer<MetricGaugeBuilder> builder) {
|
||||
final MeterRegistry meterRegistry = this.meterRegistry;
|
||||
if (meterRegistry == null) {
|
||||
if (meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
|
||||
return;
|
||||
}
|
||||
final List<Gauge.Builder> newMeters = new ArrayList<>();
|
||||
|
|
|
@ -125,6 +125,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
public static final SimpleString DEFAULT_DEAD_LETTER_QUEUE_SUFFIX = SimpleString.toSimpleString("");
|
||||
|
||||
public static final boolean DEFAULT_ENABLE_METRICS = true;
|
||||
|
||||
private AddressFullMessagePolicy addressFullMessagePolicy = null;
|
||||
|
||||
private Long maxSizeBytes = null;
|
||||
|
@ -247,6 +249,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
private SimpleString expiryQueueSuffix = null;
|
||||
|
||||
private Boolean enableMetrics = null;
|
||||
|
||||
//from amq5
|
||||
//make it transient
|
||||
private transient Integer queuePrefetch = null;
|
||||
|
@ -310,6 +314,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
this.defaultGroupBuckets = other.defaultGroupBuckets;
|
||||
this.defaultGroupFirstKey = other.defaultGroupFirstKey;
|
||||
this.defaultRingSize = other.defaultRingSize;
|
||||
this.enableMetrics = other.enableMetrics;
|
||||
}
|
||||
|
||||
public AddressSettings() {
|
||||
|
@ -882,6 +887,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
public boolean isEnableMetrics() {
|
||||
return enableMetrics != null ? enableMetrics : AddressSettings.DEFAULT_ENABLE_METRICS;
|
||||
}
|
||||
|
||||
public AddressSettings setEnableMetrics(final boolean enableMetrics) {
|
||||
this.enableMetrics = enableMetrics;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* merge 2 objects in to 1
|
||||
*
|
||||
|
@ -1069,6 +1083,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (expiryQueueSuffix == null) {
|
||||
expiryQueueSuffix = merged.expiryQueueSuffix;
|
||||
}
|
||||
if (enableMetrics == null) {
|
||||
enableMetrics = merged.enableMetrics;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1273,6 +1290,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (buffer.readableBytes() > 0) {
|
||||
maxExpiryDelay = BufferHelper.readNullableLong(buffer);
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() > 0) {
|
||||
enableMetrics = BufferHelper.readNullableBoolean(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1334,7 +1355,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
SimpleString.sizeofNullableString(deadLetterQueueSuffix) +
|
||||
BufferHelper.sizeOfNullableBoolean(autoCreateExpiryResources) +
|
||||
SimpleString.sizeofNullableString(expiryQueuePrefix) +
|
||||
SimpleString.sizeofNullableString(expiryQueueSuffix);
|
||||
SimpleString.sizeofNullableString(expiryQueueSuffix) +
|
||||
BufferHelper.sizeOfNullableBoolean(enableMetrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1456,6 +1478,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
BufferHelper.writeNullableLong(buffer, minExpiryDelay);
|
||||
|
||||
BufferHelper.writeNullableLong(buffer, maxExpiryDelay);
|
||||
|
||||
BufferHelper.writeNullableBoolean(buffer, enableMetrics);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
@ -1525,6 +1549,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
result = prime * result + ((autoCreateExpiryResources == null) ? 0 : autoCreateExpiryResources.hashCode());
|
||||
result = prime * result + ((expiryQueuePrefix == null) ? 0 : expiryQueuePrefix.hashCode());
|
||||
result = prime * result + ((expiryQueueSuffix == null) ? 0 : expiryQueueSuffix.hashCode());
|
||||
result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -1860,6 +1885,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
} else if (!expiryQueueSuffix.equals(other.expiryQueueSuffix))
|
||||
return false;
|
||||
|
||||
if (enableMetrics == null) {
|
||||
if (other.enableMetrics != null)
|
||||
return false;
|
||||
} else if (!enableMetrics.equals(other.enableMetrics))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1985,6 +2016,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
expiryQueuePrefix +
|
||||
", expiryQueueSuffix=" +
|
||||
expiryQueueSuffix +
|
||||
", enableMetrics=" +
|
||||
enableMetrics +
|
||||
"]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3583,6 +3583,14 @@
|
|||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="enable-metrics" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
whether or not to enable metrics for metrics plugins on the matching address
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
</xsd:all>
|
||||
|
||||
|
|
|
@ -372,6 +372,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a1").getDefaultAddressRoutingType());
|
||||
assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize());
|
||||
assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount());
|
||||
assertTrue(conf.getAddressesSettings().get("a1").isEnableMetrics());
|
||||
|
||||
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
|
||||
assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources());
|
||||
|
@ -406,6 +407,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertEquals(10000, conf.getAddressesSettings().get("a2").getDefaultConsumerWindowSize());
|
||||
assertEquals(-1, conf.getAddressesSettings().get("a2").getDefaultRingSize());
|
||||
assertEquals(10, conf.getAddressesSettings().get("a2").getRetroactiveMessageCount());
|
||||
assertFalse(conf.getAddressesSettings().get("a2").isEnableMetrics());
|
||||
|
||||
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
|
||||
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
|
||||
|
|
|
@ -472,6 +472,7 @@
|
|||
<default-address-routing-type>ANYCAST</default-address-routing-type>
|
||||
<default-consumer-window-size>10000</default-consumer-window-size>
|
||||
<retroactive-message-count>10</retroactive-message-count>
|
||||
<enable-metrics>false</enable-metrics>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
<resource-limit-settings>
|
||||
|
|
|
@ -77,5 +77,6 @@
|
|||
<default-address-routing-type>ANYCAST</default-address-routing-type>
|
||||
<default-consumer-window-size>10000</default-consumer-window-size>
|
||||
<retroactive-message-count>10</retroactive-message-count>
|
||||
<enable-metrics>false</enable-metrics>
|
||||
</address-setting>
|
||||
</address-settings>
|
|
@ -620,6 +620,7 @@ that would be found in the `broker.xml` file.
|
|||
<default-address-routing-type></default-address-routing-type>
|
||||
<default-ring-size>-1</default-ring-size>
|
||||
<retroactive-message-count>0</retroactive-message-count>
|
||||
<enable-metrics>true</enable-metrics>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
```
|
||||
|
@ -896,3 +897,7 @@ be set to -1. Read more about [ring queues](ring-queues.md).
|
|||
`retroactive-message-count` defines the number of messages to preserve for future
|
||||
queues created on the matching address. Defaults to 0. Read more about
|
||||
[retroactive addresses](retroactive-addresses.md).
|
||||
|
||||
`enable-metrics` determines whether or not metrics will be published to any
|
||||
configured metrics plugin for the matching address. Default is `true`. Read more
|
||||
about [metrics](metrics.md).
|
||||
|
|
|
@ -86,15 +86,19 @@ JVM memory metrics are exported as well.
|
|||
|
||||
## Configuration
|
||||
|
||||
In `broker.xml` use the `metrics-plugin` element and specify the `class-name`
|
||||
attribute to configure your plugin, e.g.:
|
||||
All metrics are enabled by default. If you want to disable metrics for a
|
||||
particular address or set of addresses you can do so by setting the
|
||||
`enable-metrics` `address-setting` to `false`.
|
||||
|
||||
To configure the plugin itself use the `metrics-plugin` element in `broker.xml`
|
||||
and specify the `class-name` attribute, e.g.:
|
||||
|
||||
```xml
|
||||
<metrics-plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.LoggingMetricsPlugin" />
|
||||
```
|
||||
|
||||
As noted, the plugin can also be configured with key/value properties in order
|
||||
to customize its behavior as necessary, e.g.:
|
||||
The plugin can also be configured with key/value properties in order to
|
||||
customize the implementation as necessary, e.g.:
|
||||
|
||||
```xml
|
||||
<metrics-plugin class-name="org.example.MyMetricsPlugin">
|
||||
|
|
|
@ -149,10 +149,21 @@ public class MetricsPluginTest extends ActiveMQTestBase {
|
|||
|
||||
@Test
|
||||
public void testForBasicMetricsPresenceAndValue() throws Exception {
|
||||
internalTestForBasicMetrics(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisablingMetrics() throws Exception {
|
||||
internalTestForBasicMetrics(false);
|
||||
}
|
||||
|
||||
private void internalTestForBasicMetrics(boolean enabled) throws Exception {
|
||||
final String data = "Simple Text " + UUID.randomUUID().toString();
|
||||
final String queueName = "simpleQueue";
|
||||
final String addressName = "simpleAddress";
|
||||
|
||||
server.getAddressSettingsRepository().getMatch(addressName).setEnableMetrics(enabled);
|
||||
|
||||
session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST));
|
||||
ClientProducer producer = session.createProducer(addressName);
|
||||
ClientMessage message = session.createMessage(true);
|
||||
|
@ -165,14 +176,14 @@ public class MetricsPluginTest extends ActiveMQTestBase {
|
|||
|
||||
Map<Meter.Id, Double> metrics = getMetrics();
|
||||
|
||||
checkMetric(metrics, "artemis.message.count", "queue", queueName, 1.0);
|
||||
checkMetric(metrics, "artemis.messages.added", "queue", queueName, 1.0);
|
||||
checkMetric(metrics, "artemis.messages.acknowledged", "queue", queueName, 0.0);
|
||||
checkMetric(metrics, "artemis.durable.message.count", "queue", queueName, 1.0);
|
||||
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 0.0);
|
||||
checkMetric(metrics, "artemis.routed.message.count", "address", addressName, 1.0);
|
||||
checkMetric(metrics, "artemis.unrouted.message.count", "address", addressName, 0.0);
|
||||
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 0.0);
|
||||
checkMetric(metrics, "artemis.message.count", "queue", queueName, 1.0, enabled);
|
||||
checkMetric(metrics, "artemis.messages.added", "queue", queueName, 1.0, enabled);
|
||||
checkMetric(metrics, "artemis.messages.acknowledged", "queue", queueName, 0.0, enabled);
|
||||
checkMetric(metrics, "artemis.durable.message.count", "queue", queueName, 1.0, enabled);
|
||||
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 0.0, enabled);
|
||||
checkMetric(metrics, "artemis.routed.message.count", "address", addressName, 1.0, enabled);
|
||||
checkMetric(metrics, "artemis.unrouted.message.count", "address", addressName, 0.0, enabled);
|
||||
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 0.0, enabled);
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(queueName);
|
||||
session.start();
|
||||
|
@ -180,8 +191,8 @@ public class MetricsPluginTest extends ActiveMQTestBase {
|
|||
assertNotNull(message);
|
||||
|
||||
metrics = getMetrics();
|
||||
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 1.0);
|
||||
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 1.0);
|
||||
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 1.0, enabled);
|
||||
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 1.0, enabled);
|
||||
|
||||
message.acknowledge();
|
||||
assertEquals(data, message.getBodyBuffer().readString());
|
||||
|
@ -193,14 +204,14 @@ public class MetricsPluginTest extends ActiveMQTestBase {
|
|||
|
||||
metrics = getMetrics();
|
||||
|
||||
checkMetric(metrics, "artemis.message.count", "queue", queueName, 0.0);
|
||||
checkMetric(metrics, "artemis.messages.added", "queue", queueName, 1.0);
|
||||
checkMetric(metrics, "artemis.messages.acknowledged", "queue", queueName, 1.0);
|
||||
checkMetric(metrics, "artemis.durable.message.count", "queue", queueName, 0.0);
|
||||
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 0.0);
|
||||
checkMetric(metrics, "artemis.routed.message.count", "address", addressName, 1.0);
|
||||
checkMetric(metrics, "artemis.unrouted.message.count", "address", addressName, 0.0);
|
||||
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 0.0);
|
||||
checkMetric(metrics, "artemis.message.count", "queue", queueName, 0.0, enabled);
|
||||
checkMetric(metrics, "artemis.messages.added", "queue", queueName, 1.0, enabled);
|
||||
checkMetric(metrics, "artemis.messages.acknowledged", "queue", queueName, 1.0, enabled);
|
||||
checkMetric(metrics, "artemis.durable.message.count", "queue", queueName, 0.0, enabled);
|
||||
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 0.0, enabled);
|
||||
checkMetric(metrics, "artemis.routed.message.count", "address", addressName, 1.0, enabled);
|
||||
checkMetric(metrics, "artemis.unrouted.message.count", "address", addressName, 0.0, enabled);
|
||||
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 0.0, enabled);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -247,13 +258,21 @@ public class MetricsPluginTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
public void checkMetric(Map<Meter.Id, Double> metrics, String metric, String tag, String tagValue, Double expectedValue) {
|
||||
checkMetric(metrics, metric, tag, tagValue, expectedValue, true);
|
||||
}
|
||||
|
||||
public void checkMetric(Map<Meter.Id, Double> metrics, String metric, String tag, String tagValue, Double expectedValue, boolean enabled) {
|
||||
OptionalDouble actualValue = metrics.entrySet().stream()
|
||||
.filter(entry -> metric.equals(entry.getKey().getName()))
|
||||
.filter(entry -> tagValue.equals(entry.getKey().getTag(tag)))
|
||||
.mapToDouble(Map.Entry::getValue)
|
||||
.findFirst();
|
||||
|
||||
assertTrue(metric + " for " + tag + " " + tagValue + " not present", actualValue.isPresent());
|
||||
assertEquals(metric + " not equal", expectedValue, actualValue.getAsDouble(), 0);
|
||||
if (enabled) {
|
||||
assertTrue(metric + " for " + tag + " " + tagValue + " not present", actualValue.isPresent());
|
||||
assertEquals(metric + " not equal", expectedValue, actualValue.getAsDouble(), 0);
|
||||
} else {
|
||||
assertFalse(metric + " for " + tag + " " + tagValue + " present", actualValue.isPresent());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue