diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index e179d5ccc2..059f786cbc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -25,6 +25,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException * An ActiveMQServerControl is used to manage ActiveMQ Artemis servers. */ public interface ActiveMQServerControl { + String CONNECTION_COUNT_DESCRIPTION = "Number of clients connected to this server"; + String TOTAL_CONNECTION_COUNT_DESCRIPTION = "Number of clients which have connected to this server since it was started"; + String ADDRESS_MEMORY_USAGE_DESCRIPTION = "Memory used by all the addresses on broker for in-memory messages"; /** * Returns this server's version. @@ -35,13 +38,13 @@ public interface ActiveMQServerControl { /** * Returns the number of clients connected to this server. */ - @Attribute(desc = "Number of clients connected to this server") + @Attribute(desc = CONNECTION_COUNT_DESCRIPTION) int getConnectionCount(); /** * Returns the number of clients which have connected to this server since it was started. */ - @Attribute(desc = "Number of clients which have connected to this server since it was started") + @Attribute(desc = TOTAL_CONNECTION_COUNT_DESCRIPTION) long getTotalConnectionCount(); /** @@ -435,7 +438,7 @@ public interface ActiveMQServerControl { /** * Returns the memory used by all the addresses on broker for in-memory messages */ - @Attribute(desc = "Memory used by all the addresses on broker for in-memory messages") + @Attribute(desc = ADDRESS_MEMORY_USAGE_DESCRIPTION) long getAddressMemoryUsage(); /** diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java index 4bb100b5a9..90887b0e2c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java @@ -23,6 +23,8 @@ import java.util.Map; * An AddressControl is used to manage an address. */ public interface AddressControl { + String ROUTED_MESSAGE_COUNT_DESCRIPTION = "number of messages routed to one or more bindings"; + String UNROUTED_MESSAGE_COUNT_DESCRIPTION = "number of messages not routed to any bindings"; /** * Returns the managed address. @@ -101,19 +103,19 @@ public interface AddressControl { @Attribute(desc = "names of all bindings (both queues and diverts) bound to this address") String[] getBindingNames() throws Exception; - @Attribute(desc = "number of messages added to all the queues for this address") + @Attribute(desc = "number of messages currently in all queues bound to this address (includes scheduled, paged, and in-delivery messages)") long getMessageCount(); /** * Returns the number of messages routed to one or more bindings */ - @Attribute(desc = "number of messages routed to one or more bindings") + @Attribute(desc = ROUTED_MESSAGE_COUNT_DESCRIPTION) long getRoutedMessageCount(); /** * Returns the number of messages not routed to any bindings */ - @Attribute(desc = "number of messages not routed to any bindings") + @Attribute(desc = UNROUTED_MESSAGE_COUNT_DESCRIPTION) long getUnRoutedMessageCount(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 0d3ab0f945..2443d277ca 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -24,7 +24,26 @@ import java.util.Map; * A QueueControl is used to manage a queue. */ public interface QueueControl { - // Attributes ---------------------------------------------------- + String MESSAGE_COUNT_DESCRIPTION = "number of messages currently in this queue (includes scheduled, paged, and in-delivery messages)"; + String DURABLE_MESSAGE_COUNT_DESCRIPTION = "number of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)"; + String PERSISTENT_SIZE_DESCRIPTION = "persistent size of all messages (including durable and non-durable) currently in this queue (includes scheduled, paged, and in-delivery messages)"; + String DURABLE_PERSISTENT_SIZE_DESCRIPTION = "persistent size of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)"; + + String SCHEDULED_MESSAGE_COUNT_DESCRIPTION = "number of scheduled messages in this queue"; + String DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION = "number of durable scheduled messages in this queue"; + String SCHEDULED_SIZE_DESCRIPTION = "persistent size of scheduled messages in this queue"; + String DURABLE_SCHEDULED_SIZE_DESCRIPTION = "persistent size of durable scheduled messages in this queue"; + + String DELIVERING_MESSAGE_COUNT_DESCRIPTION = "number of messages that this queue is currently delivering to its consumers"; + String DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION = "number of durable messages that this queue is currently delivering to its consumers"; + String DELIVERING_SIZE_DESCRIPTION = "persistent size of messages that this queue is currently delivering to its consumers"; + String DURABLE_DELIVERING_SIZE_DESCRIPTION = "persistent size of durable messages that this queue is currently delivering to its consumers"; + + String CONSUMER_COUNT_DESCRIPTION = "number of consumers consuming messages from this queue"; + String MESSAGES_ADDED_DESCRIPTION = "number of messages added to this queue since it was created"; + String MESSAGES_ACKNOWLEDGED_DESCRIPTION = "number of messages acknowledged from this queue since it was created"; + String MESSAGES_EXPIRED_DESCRIPTION = "number of messages expired from this queue since it was created"; + String MESSAGES_KILLED_DESCRIPTION = "number of messages removed from this queue since it was created due to exceeding the max delivery attempts"; /** * Returns the name of this queue. @@ -77,7 +96,7 @@ public interface QueueControl { /** * Returns the number of messages currently in this queue. */ - @Attribute(desc = "number of messages currently in this queue (includes scheduled, paged, and in-delivery messages)") + @Attribute(desc = MESSAGE_COUNT_DESCRIPTION) long getMessageCount(); /** @@ -85,13 +104,13 @@ public interface QueueControl { * is the amount of space the message would take up on disk which is used to track how much data there * is to consume on this queue */ - @Attribute(desc = "persistent size of all messages (including durable and non-durable) currently in this queue (includes scheduled, paged, and in-delivery messages)") + @Attribute(desc = PERSISTENT_SIZE_DESCRIPTION) long getPersistentSize(); /** * Returns the number of durable messages currently in this queue. */ - @Attribute(desc = "number of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)") + @Attribute(desc = DURABLE_MESSAGE_COUNT_DESCRIPTION) long getDurableMessageCount(); /** @@ -99,73 +118,73 @@ public interface QueueControl { * is the amount of space the message would take up on disk which is used to track how much data there * is to consume on this queue */ - @Attribute(desc = "persistent size of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)") + @Attribute(desc = DURABLE_PERSISTENT_SIZE_DESCRIPTION) long getDurablePersistentSize(); /** * Returns the number of scheduled messages in this queue. */ - @Attribute(desc = "number of scheduled messages in this queue") + @Attribute(desc = SCHEDULED_MESSAGE_COUNT_DESCRIPTION) long getScheduledCount(); /** * Returns the size of scheduled messages in this queue. */ - @Attribute(desc = "persistent size of scheduled messages in this queue") + @Attribute(desc = SCHEDULED_SIZE_DESCRIPTION) long getScheduledSize(); /** * Returns the number of durable scheduled messages in this queue. */ - @Attribute(desc = "number of durable scheduled messages in this queue") + @Attribute(desc = DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION) long getDurableScheduledCount(); /** * Returns the size of durable scheduled messages in this queue. */ - @Attribute(desc = "persistent size of durable scheduled messages in this queue") + @Attribute(desc = DURABLE_SCHEDULED_SIZE_DESCRIPTION) long getDurableScheduledSize(); /** * Returns the number of consumers consuming messages from this queue. */ - @Attribute(desc = "number of consumers consuming messages from this queue") + @Attribute(desc = CONSUMER_COUNT_DESCRIPTION) int getConsumerCount(); /** * Returns the number of messages that this queue is currently delivering to its consumers. */ - @Attribute(desc = "number of messages that this queue is currently delivering to its consumers") + @Attribute(desc = DELIVERING_MESSAGE_COUNT_DESCRIPTION) int getDeliveringCount(); /** * Returns the persistent size of messages that this queue is currently delivering to its consumers. */ - @Attribute(desc = "persistent size of messages that this queue is currently delivering to its consumers") + @Attribute(desc = DELIVERING_SIZE_DESCRIPTION) long getDeliveringSize(); /** * Returns the number of durable messages that this queue is currently delivering to its consumers. */ - @Attribute(desc = "number of durable messages that this queue is currently delivering to its consumers") + @Attribute(desc = DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION) int getDurableDeliveringCount(); /** * Returns the size of durable messages that this queue is currently delivering to its consumers. */ - @Attribute(desc = "persistent size of durable messages that this queue is currently delivering to its consumers") + @Attribute(desc = DURABLE_DELIVERING_SIZE_DESCRIPTION) long getDurableDeliveringSize(); /** * Returns the number of messages added to this queue since it was created. */ - @Attribute(desc = "number of messages added to this queue since it was created") + @Attribute(desc = MESSAGES_ADDED_DESCRIPTION) long getMessagesAdded(); /** * Returns the number of messages added to this queue since it was created. */ - @Attribute(desc = "number of messages acknowledged from this queue since it was created") + @Attribute(desc = MESSAGES_ACKNOWLEDGED_DESCRIPTION) long getMessagesAcknowledged(); /** @@ -178,13 +197,13 @@ public interface QueueControl { /** * Returns the number of messages expired from this queue since it was created. */ - @Attribute(desc = "number of messages expired from this queue since it was created") + @Attribute(desc = MESSAGES_EXPIRED_DESCRIPTION) long getMessagesExpired(); /** * Returns the number of messages removed from this queue since it was created due to exceeding the max delivery attempts. */ - @Attribute(desc = "number of messages removed from this queue since it was created due to exceeding the max delivery attempts") + @Attribute(desc = MESSAGES_KILLED_DESCRIPTION) long getMessagesKilled(); /** diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml index 0554df27f5..a732656a8b 100644 --- a/artemis-distribution/src/main/assembly/dep.xml +++ b/artemis-distribution/src/main/assembly/dep.xml @@ -70,6 +70,7 @@ org.apache.activemq:artemis-web org.apache.activemq.rest:artemis-rest org.apache.qpid:qpid-jms-client + io.micrometer:micrometer-core org.apache.geronimo.specs:geronimo-jms_2.0_spec diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml index 826d119731..d6a21b3d9e 100644 --- a/artemis-features/src/main/resources/features.xml +++ b/artemis-features/src/main/resources/features.xml @@ -65,6 +65,8 @@ mvn:org.apache.commons/commons-configuration2/${commons.config.version} mvn:org.apache.commons/commons-text/1.6 mvn:org.apache.commons/commons-lang3/${commons.lang.version} + + mvn:org.apache.activemq/activemq-artemis-native/${activemq-artemis-native-version} mvn:org.apache.activemq/artemis-server-osgi/${pom.version} diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml index e20179f492..f2b69d0cc1 100644 --- a/artemis-server/pom.xml +++ b/artemis-server/pom.xml @@ -131,6 +131,10 @@ org.apache.commons commons-configuration2 + + io.micrometer + micrometer-core + commons-io commons-io diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 04a95cbe01..0bc2b63bb9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; @@ -1013,6 +1014,8 @@ public interface Configuration { Configuration addSecuritySettingPlugin(SecuritySettingPlugin plugin); + Configuration setMetricsPlugin(ActiveMQMetricsPlugin plugin); + /** * @return list of {@link ConnectorServiceConfiguration} */ @@ -1020,6 +1023,8 @@ public interface Configuration { List getSecuritySettingPlugins(); + ActiveMQMetricsPlugin getMetricsPlugin(); + /** * The default password decoder */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index a6ece62798..31624f3a50 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -43,6 +43,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.config.FederationConfiguration; +import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; @@ -260,6 +261,8 @@ public class ConfigurationImpl implements Configuration, Serializable { private List securitySettingPlugins = new ArrayList<>(); + private ActiveMQMetricsPlugin metricsPlugin = null; + private final List brokerPlugins = new CopyOnWriteArrayList<>(); private final List brokerConnectionPlugins = new CopyOnWriteArrayList<>(); private final List brokerSessionPlugins = new CopyOnWriteArrayList<>(); @@ -1423,6 +1426,11 @@ public class ConfigurationImpl implements Configuration, Serializable { return this.securitySettingPlugins; } + @Override + public ActiveMQMetricsPlugin getMetricsPlugin() { + return this.metricsPlugin; + } + @Override public void registerBrokerPlugins(final List plugins) { plugins.forEach(plugin -> registerBrokerPlugin(plugin)); @@ -1632,6 +1640,12 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + @Override + public ConfigurationImpl setMetricsPlugin(final ActiveMQMetricsPlugin plugin) { + this.metricsPlugin = plugin; + return this; + } + @Override public Boolean isMaskPassword() { return maskPassword; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index b232cc989a..e064ecadff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -78,6 +78,7 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; +import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -553,7 +554,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { parseDivertConfiguration(dvNode, config); } - // Persistence config config.setLargeMessagesDirectory(getString(e, "large-messages-directory", config.getLargeMessagesDirectory(), Validators.NOT_NULL_OR_EMPTY)); @@ -677,6 +677,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { parseBrokerPlugins(e, config); + NodeList metricsPlugin = e.getElementsByTagName("metrics-plugin"); + + if (metricsPlugin.getLength() != 0) { + parseMetricsPlugin(metricsPlugin.item(0), config); + } + NodeList connectorServiceConfigs = e.getElementsByTagName("connector-service"); ArrayList configs = new ArrayList<>(); @@ -767,6 +773,23 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { return properties; } + private ActiveMQMetricsPlugin parseMetricsPlugin(final Node item, final Configuration config) { + final String clazz = item.getAttributes().getNamedItem("class-name").getNodeValue(); + + Map properties = getMapOfChildPropertyElements(item); + + ActiveMQMetricsPlugin metricsPlugin = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ActiveMQMetricsPlugin run() { + return (ActiveMQMetricsPlugin) ClassloadingUtil.newInstanceFromClassLoader(FileConfigurationParser.class, clazz); + } + }); + + config.setMetricsPlugin(metricsPlugin.init(properties)); + + return metricsPlugin; + } + /** * @param e * @param config diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 5764a28c3a..1bcff225fe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -163,9 +163,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding this.addressQueueReaperPeriod = addressQueueReaperPeriod; if (wildcardConfiguration.isRoutingEnabled()) { - addressManager = new WildcardAddressManager(this, wildcardConfiguration, storageManager); + addressManager = new WildcardAddressManager(this, wildcardConfiguration, storageManager, server.getMetricsManager()); } else { - addressManager = new SimpleAddressManager(this, wildcardConfiguration, storageManager); + addressManager = new SimpleAddressManager(this, wildcardConfiguration, storageManager, server.getMetricsManager()); } this.idCacheSize = idCacheSize; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index c762b6c83c..4cf28456f9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.CompositeAddress; import org.jboss.logging.Logger; @@ -65,18 +66,23 @@ public class SimpleAddressManager implements AddressManager { private final BindingsFactory bindingsFactory; + protected final MetricsManager metricsManager; + protected final WildcardConfiguration wildcardConfiguration; - public SimpleAddressManager(final BindingsFactory bindingsFactory, final StorageManager storageManager) { - this(bindingsFactory, new WildcardConfiguration(), storageManager); + public SimpleAddressManager(final BindingsFactory bindingsFactory, final StorageManager storageManager, + final MetricsManager metricsManager) { + this(bindingsFactory, new WildcardConfiguration(), storageManager, metricsManager); } public SimpleAddressManager(final BindingsFactory bindingsFactory, final WildcardConfiguration wildcardConfiguration, - final StorageManager storageManager) { + final StorageManager storageManager, + final MetricsManager metricsManager) { this.wildcardConfiguration = wildcardConfiguration; this.bindingsFactory = bindingsFactory; this.storageManager = storageManager; + this.metricsManager = metricsManager; } @Override @@ -253,7 +259,11 @@ public class SimpleAddressManager implements AddressManager { @Override public boolean reloadAddressInfo(AddressInfo addressInfo) throws Exception { - return addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo) == null; + boolean added = addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo) == null; + if (added) { + addressInfo.registerMeters(metricsManager); + } + return added; } @Override @@ -345,7 +355,13 @@ public class SimpleAddressManager implements AddressManager { @Override public AddressInfo removeAddressInfo(SimpleString address) throws Exception { - return addressInfoMap.remove(CompositeAddress.extractAddressName(address)); + final AddressInfo removed = addressInfoMap.remove(CompositeAddress.extractAddressName(address)); + + if (removed != null) { + removed.unregisterMeters(metricsManager); + } + + return removed; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java index 2180e0b036..c688472d8a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/WildcardAddressManager.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.BindingsFactory; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.transaction.Transaction; import java.util.Collection; @@ -45,13 +46,17 @@ public class WildcardAddressManager extends SimpleAddressManager { private final Map wildCardAddresses = new ConcurrentHashMap<>(); - public WildcardAddressManager(final BindingsFactory bindingsFactory, final WildcardConfiguration wildcardConfiguration, final - StorageManager storageManager) { - super(bindingsFactory, wildcardConfiguration, storageManager); + public WildcardAddressManager(final BindingsFactory bindingsFactory, + final WildcardConfiguration wildcardConfiguration, + final StorageManager storageManager, + final MetricsManager metricsManager) { + super(bindingsFactory, wildcardConfiguration, storageManager, metricsManager); } - public WildcardAddressManager(final BindingsFactory bindingsFactory, StorageManager storageManager) { - super(bindingsFactory, storageManager); + public WildcardAddressManager(final BindingsFactory bindingsFactory, + final StorageManager storageManager, + final MetricsManager metricsManager) { + super(bindingsFactory, storageManager, metricsManager); } @Override @@ -155,6 +160,7 @@ public class WildcardAddressManager extends SimpleAddressManager { //Remove from mappings so removeAndUpdateAddressMap processes and cleanup mappings.remove(address); removeAndUpdateAddressMap(new AddressImpl(removed.getName(), wildcardConfiguration)); + removed.unregisterMeters(metricsManager); } return removed; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index cdb852075a..cd5ef022d9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ConnectorsService; import org.apache.activemq.artemis.core.server.management.ManagementService; +import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; @@ -345,6 +346,8 @@ public interface ActiveMQServer extends ServiceComponent { ResourceManager getResourceManager(); + MetricsManager getMetricsManager(); + List getSessions(String connectionID); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 63508a25e9..68ca620572 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.server.impl; -import java.util.concurrent.atomic.AtomicBoolean; import javax.management.MBeanServer; import java.io.File; import java.io.IOException; @@ -47,6 +46,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -57,6 +57,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; @@ -64,6 +66,7 @@ import org.apache.activemq.artemis.core.config.ConfigurationUtils; import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.config.FederationConfiguration; import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; @@ -141,7 +144,7 @@ import org.apache.activemq.artemis.core.server.ServiceRegistry; import org.apache.activemq.artemis.core.server.cluster.BackupManager; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy; -import org.apache.activemq.artemis.core.config.FederationConfiguration; +import org.apache.activemq.artemis.core.server.federation.FederationManager; import org.apache.activemq.artemis.core.server.files.FileMoveManager; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.group.GroupingHandler; @@ -151,6 +154,8 @@ import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler; import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl; +import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames; +import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; @@ -162,7 +167,6 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlug import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; -import org.apache.activemq.artemis.core.server.federation.FederationManager; import org.apache.activemq.artemis.core.server.reload.ReloadCallback; import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl; @@ -266,6 +270,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { private volatile ResourceManager resourceManager; + private volatile MetricsManager metricsManager; + private volatile ActiveMQServerControlImpl messagingServerControl; private volatile ClusterManager clusterManager; @@ -1157,6 +1163,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } stopComponent(managementService); + unregisterMeters(); stopComponent(resourceManager); stopComponent(postOffice); @@ -1462,6 +1469,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { return resourceManager; } + @Override + public MetricsManager getMetricsManager() { + return metricsManager; + } + @Override public Version getVersion() { return version; @@ -2716,6 +2728,18 @@ public class ActiveMQServerImpl implements ActiveMQServer { pagingManager = createPagingManager(); resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool); + + /** + * If there is no plugin configured we don't want to instantiate a MetricsManager. This keeps the dependency + * on Micrometer as "optional" in the Maven pom.xml. This is particularly beneficial because optional dependencies + * are not required to be included in the OSGi bundle and the Micrometer jars apparently don't support OSGi. + */ + if (configuration.getMetricsPlugin() != null) { + metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsPlugin()); + } + + registerMeters(); + postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getAddressQueueScanPeriod(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository); // This can't be created until node id is set @@ -2899,6 +2923,24 @@ public class ActiveMQServerImpl implements ActiveMQServer { callActivationCompleteCallbacks(); } + private void registerMeters() { + MetricsManager metricsManager = this.metricsManager; // volatile load + if (metricsManager != null) { + metricsManager.registerBrokerGauge(builder -> { + builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION); + builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION); + builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(getPagingManager().getGlobalSize()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION); + }); + } + } + + private void unregisterMeters() { + MetricsManager metricsManager = this.metricsManager; // volatile load + if (metricsManager != null) { + metricsManager.remove(ResourceNames.BROKER + "." + configuration.getName()); + } + } + private void deploySecurityFromConfiguration() { for (Map.Entry> entry : configuration.getSecurityRoles().entrySet()) { securityRepository.addMatch(entry.getKey(), entry.getValue(), true); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index ca2b2f1a3e..f1195c6aee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -16,15 +16,19 @@ */ package org.apache.activemq.artemis.core.server.impl; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.utils.CompositeAddress; -import org.apache.activemq.artemis.utils.PrefixUtil; - import java.util.EnumSet; import java.util.Map; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.AddressControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.core.server.metrics.AddressMetricNames; +import org.apache.activemq.artemis.core.server.metrics.MetricsManager; +import org.apache.activemq.artemis.utils.CompositeAddress; +import org.apache.activemq.artemis.utils.PrefixUtil; + public class AddressInfo { private long id; @@ -191,4 +195,18 @@ public class AddressInfo { return unRoutedMessageCountUpdater.get(this); } + public void registerMeters(MetricsManager metricsManager) { + if (metricsManager != null) { + metricsManager.registerAddressGauge(name.toString(), builder -> { + builder.register(AddressMetricNames.ROUTED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getRoutedMessageCount()), AddressControl.ROUTED_MESSAGE_COUNT_DESCRIPTION); + builder.register(AddressMetricNames.UNROUTED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getUnRoutedMessageCount()), AddressControl.UNROUTED_MESSAGE_COUNT_DESCRIPTION); + }); + } + } + + public void unregisterMeters(MetricsManager metricsManager) { + if (metricsManager != null) { + metricsManager.remove(ResourceNames.ADDRESS + name); + } + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index aaa562c172..76c021983b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -50,6 +50,9 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.core.PriorityAware; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; @@ -71,7 +74,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.PriorityAware; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -81,6 +83,8 @@ import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; +import org.apache.activemq.artemis.core.server.metrics.MetricsManager; +import org.apache.activemq.artemis.core.server.metrics.QueueMetricNames; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -572,6 +576,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.user = user; this.factory = factory; + + registerMeters(); } // Bindable implementation ------------------------------------------------------------------------------------- @@ -1966,6 +1972,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { slowConsumerReaperFuture.cancel(false); } + unregisterMeters(); + tx.commit(); } catch (Exception e) { tx.rollback(); @@ -3852,6 +3860,44 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } + private void registerMeters() { + String addressName = address.toString(); + String queueName = name.toString(); + + if (server != null && server.getMetricsManager() != null) { + MetricsManager metricsManager = server.getMetricsManager(); + + metricsManager.registerQueueGauge(queueName, addressName, (builder) -> { + builder.register(QueueMetricNames.MESSAGE_COUNT, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.PERSISTENT_SIZE, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION); + builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION); + + builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getMessageCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurableMessageCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurableMessageCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurablePersistentSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION); + + builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION); + builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION); + + builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION); + builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION); + builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION); + builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION); + builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION); + }); + } + } + + private void unregisterMeters() { + if (server != null && server.getMetricsManager() != null) { + server.getMetricsManager().remove(ResourceNames.QUEUE + name); + } + } + private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener { @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/ActiveMQMetricsPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/ActiveMQMetricsPlugin.java new file mode 100644 index 0000000000..8cdf1542b8 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/ActiveMQMetricsPlugin.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.metrics; + +import java.io.Serializable; +import java.util.Map; + +import io.micrometer.core.instrument.MeterRegistry; + +public interface ActiveMQMetricsPlugin extends Serializable { + + ActiveMQMetricsPlugin init(Map options); + + MeterRegistry getRegistry(); +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/AddressMetricNames.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/AddressMetricNames.java new file mode 100644 index 0000000000..902c6a09e8 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/AddressMetricNames.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.metrics; + +public class AddressMetricNames { + + public static final String ROUTED_MESSAGE_COUNT = "routed.message.count"; + public static final String UNROUTED_MESSAGE_COUNT = "unrouted.message.count"; + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/BrokerMetricNames.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/BrokerMetricNames.java new file mode 100644 index 0000000000..f487eecc7c --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/BrokerMetricNames.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.metrics; + +public class BrokerMetricNames { + + public static final String CONNECTION_COUNT = "connection.count"; + public static final String TOTAL_CONNECTION_COUNT = "total.connection.count"; + public static final String ADDRESS_MEMORY_USAGE = "address.memory.usage"; + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java new file mode 100644 index 0000000000..b31be729bc --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.metrics; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.function.ToDoubleFunction; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics; +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; + +public class MetricsManager { + + private final String brokerName; + + private final MeterRegistry meterRegistry; + + private final Map> meters = new ConcurrentHashMap<>(); + + public MetricsManager(String brokerName, ActiveMQMetricsPlugin metricsPlugin) { + this.brokerName = brokerName; + meterRegistry = metricsPlugin.getRegistry(); + Metrics.globalRegistry.add(meterRegistry); + new JvmMemoryMetrics().bindTo(meterRegistry); + } + + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + + @FunctionalInterface + public interface MetricGaugeBuilder { + + void register(String metricName, Object state, ToDoubleFunction f, String description); + } + + public void registerQueueGauge(String address, String queue, Consumer builder) { + final MeterRegistry meterRegistry = this.meterRegistry; + if (meterRegistry == null) { + return; + } + final List newMeters = new ArrayList<>(); + builder.accept((metricName, state, f, description) -> { + Gauge.Builder meter = Gauge + .builder("artemis." + metricName, state, f) + .tag("broker", brokerName) + .tag("address", address) + .tag("queue", queue) + .description(description); + newMeters.add(meter); + }); + final String resource = ResourceNames.QUEUE + queue; + this.meters.compute(resource, (s, meters) -> { + //the old meters are ignored on purpose + meters = new ArrayList<>(newMeters.size()); + for (Gauge.Builder gauge : newMeters) { + meters.add(gauge.register(meterRegistry)); + } + return meters; + }); + } + + public void registerAddressGauge(String address, Consumer builder) { + final MeterRegistry meterRegistry = this.meterRegistry; + if (meterRegistry == null) { + return; + } + final List newMeters = new ArrayList<>(); + builder.accept((metricName, state, f, description) -> { + Gauge.Builder meter = Gauge + .builder("artemis." + metricName, state, f) + .tag("broker", brokerName) + .tag("address", address) + .description(description); + newMeters.add(meter); + }); + final String resource = ResourceNames.ADDRESS + address; + this.meters.compute(resource, (s, meters) -> { + //the old meters are ignored on purpose + meters = new ArrayList<>(newMeters.size()); + for (Gauge.Builder gauge : newMeters) { + meters.add(gauge.register(meterRegistry)); + } + return meters; + }); + } + + public void registerBrokerGauge(Consumer builder) { + final MeterRegistry meterRegistry = this.meterRegistry; + if (meterRegistry == null) { + return; + } + final List newMeters = new ArrayList<>(); + builder.accept((metricName, state, f, description) -> { + Gauge.Builder meter = Gauge + .builder("artemis." + metricName, state, f) + .tag("broker", brokerName) + .description(description); + newMeters.add(meter); + }); + final String resource = ResourceNames.BROKER + "." + brokerName; + this.meters.compute(resource, (s, meters) -> { + //the old meters are ignored on purpose + meters = new ArrayList<>(newMeters.size()); + for (Gauge.Builder gauge : newMeters) { + meters.add(gauge.register(meterRegistry)); + } + return meters; + }); + } + + public void remove(String component) { + meters.computeIfPresent(component, (s, meters) -> { + if (meters == null) { + return null; + } + for (Meter meter : meters) { + Meter removed = meterRegistry.remove(meter); + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQServerLogger.LOGGER.debug("Removed meter: " + removed.getId()); + } + } + return null; + }); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/QueueMetricNames.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/QueueMetricNames.java new file mode 100644 index 0000000000..65aea1a3df --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/QueueMetricNames.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.metrics; + +public class QueueMetricNames { + + public static final String MESSAGE_COUNT = "message.count"; + public static final String DURABLE_MESSAGE_COUNT = "durable.message.count"; + public static final String PERSISTENT_SIZE = "persistent.size"; + public static final String DURABLE_PERSISTENT_SIZE = "durable.persistent.size"; + + public static final String DELIVERING_MESSAGE_COUNT = "delivering.message.count"; + public static final String DELIVERING_DURABLE_MESSAGE_COUNT = "delivering.durable.message.count"; + public static final String DELIVERING_PERSISTENT_SIZE = "delivering.persistent_size"; + public static final String DELIVERING_DURABLE_PERSISTENT_SIZE = "delivering.durable.persistent.size"; + + public static final String SCHEDULED_MESSAGE_COUNT = "scheduled.message.count"; + public static final String SCHEDULED_DURABLE_MESSAGE_COUNT = "scheduled.durable.message.count"; + public static final String SCHEDULED_PERSISTENT_SIZE = "scheduled.persistent.size"; + public static final String SCHEDULED_DURABLE_PERSISTENT_SIZE = "scheduled_durable.persistent.size"; + + public static final String MESSAGES_ACKNOWLEDGED = "messages.acknowledged"; + public static final String MESSAGES_ADDED = "messages.added"; + public static final String MESSAGES_KILLED = "messages.killed"; + public static final String MESSAGES_EXPIRED = "messages.expired"; + public static final String CONSUMER_COUNT = "consumer.count"; +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/plugins/LoggingMetricsPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/plugins/LoggingMetricsPlugin.java new file mode 100644 index 0000000000..08839f09ae --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/plugins/LoggingMetricsPlugin.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.metrics.plugins; + +import java.util.Map; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.logging.LoggingMeterRegistry; +import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; + +public class LoggingMetricsPlugin implements ActiveMQMetricsPlugin { + + private transient MeterRegistry meterRegistry; + + @Override + public ActiveMQMetricsPlugin init(Map options) { + this.meterRegistry = new LoggingMeterRegistry(); + return this; + } + + @Override + public MeterRegistry getRegistry() { + return meterRegistry; + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/plugins/SimpleMetricsPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/plugins/SimpleMetricsPlugin.java new file mode 100644 index 0000000000..abebb913bb --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/plugins/SimpleMetricsPlugin.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.metrics.plugins; + +import java.util.Map; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; + +/** + * A very simple metrics plugin used for testing + */ +public class SimpleMetricsPlugin implements ActiveMQMetricsPlugin { + + private transient MeterRegistry meterRegistry; + + private Map options; + + @Override + public ActiveMQMetricsPlugin init(Map options) { + this.meterRegistry = new SimpleMeterRegistry(); + this.options = options; + return this; + } + + @Override + public MeterRegistry getRegistry() { + return meterRegistry; + } + + public Map getOptions() { + return options; + } +} diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index e24facf405..03ff9f8537 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1020,6 +1020,33 @@ + + + + + a metrics plugin + + + + + + + properties to configure a plugin + + + + + + + + the name of the metrics plugin class to instantiate + + + + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index f7b558524b..c5ee2e7e56 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -29,12 +29,6 @@ import java.util.Map; import java.util.Set; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; -import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; -import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; -import org.apache.activemq.artemis.utils.RandomUtil; -import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; @@ -50,15 +44,23 @@ import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.FileDeploymentManager; import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin; +import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; +import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -434,6 +436,13 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(CriticalAnalyzerPolicy.HALT, conf.getCriticalAnalyzerPolicy()); assertEquals(false, conf.isJournalDatasync()); + + ActiveMQMetricsPlugin metricsPlugin = conf.getMetricsPlugin(); + assertTrue(metricsPlugin instanceof SimpleMetricsPlugin); + Map options = ((SimpleMetricsPlugin) metricsPlugin).getOptions(); + assertEquals("x", options.get("foo")); + assertEquals("y", options.get("bar")); + assertEquals("z", options.get("baz")); } private void verifyAddresses() { diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 4bc7679679..9042426290 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -280,6 +280,12 @@ + + + + + + diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index 9eef2acc90..dc8dc45f30 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -193,6 +193,13 @@ + + + + + + + diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd index 1dbfb74ff7..0ea3d1dc96 100644 --- a/artemis-tools/src/test/resources/artemis-configuration.xsd +++ b/artemis-tools/src/test/resources/artemis-configuration.xsd @@ -1012,6 +1012,33 @@ + + + + + a metrics plugin + + + + + + + properties to configure a plugin + + + + + + + + the name of the metrics plugin class to instantiate + + + + + + + diff --git a/artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java b/artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java index 64f89d2da0..2b0ff02294 100644 --- a/artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java +++ b/artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java @@ -151,7 +151,8 @@ public class WebServerComponent implements ExternalComponent { } handlers.addHandler(homeContext); handlers.addHandler(instanceContext); - handlers.addHandler(defaultHandler); + handlers.addHandler(defaultHandler); // this should be last + server.setHandler(handlers); } diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md index e2da2c82f6..2207b3230a 100644 --- a/docs/user-manual/en/SUMMARY.md +++ b/docs/user-manual/en/SUMMARY.md @@ -45,6 +45,7 @@ * [Extra Acknowledge Modes](pre-acknowledge.md) * [Management](management.md) * [Management Console](management-console.md) +* [Metrics](metrics.md) * [Security](security.md) * [Masking Passwords](masking-passwords.md) * [Broker Plugins](broker-plugins.md) diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md index 4eafb58458..c18c082d11 100644 --- a/docs/user-manual/en/configuration-index.md +++ b/docs/user-manual/en/configuration-index.md @@ -154,6 +154,7 @@ log-delegate-factory-class-name | **deprecated** the name of the factory class t [message-counter-sample-period](management.md#message-counters) | the sample period (in ms) to use for message counters. | 10000 [message-expiry-scan-period](message-expiry.md#configuring-the-expiry-reaper-thread) | how often (in ms) to scan for expired messages. | 30000 [message-expiry-thread-priority](message-expiry.md#configuring-the-expiry-reaper-thread)| the priority of the thread expiring messages. | 3 +[metrics-plugin](metrics.md) | [a plugin to export metrics](#metrics-plugin-type) | n/a [address-queue-scan-period](address-model.md#configuring-addresses-and-queues-via-address-settings) | how often (in ms) to scan for addresses & queues that should be removed. | 30000 name | node name; used in topology notifications if set. | n/a [password-codec](masking-passwords.md) | the name of the class (and optional configuration properties) used to decode masked passwords. Only valid when `mask-password` is `true`. | n/a @@ -379,6 +380,14 @@ Name | Description [class-name](broker-plugins.md#registering-a-plugin) | the name of the broker plugin class to instantiate +## metrics-plugin type + +Name | Description +---|--- +[property](metrics.md)| properties to configure a plugin +[class-name](metrics.md) | the name of the metrics plugin class to instantiate + + ## resource-limit type Name | Description | Default diff --git a/docs/user-manual/en/metrics.md b/docs/user-manual/en/metrics.md new file mode 100644 index 0000000000..e1ed727d84 --- /dev/null +++ b/docs/user-manual/en/metrics.md @@ -0,0 +1,105 @@ +# Metrics + +Apache ActiveMQ Artemis can export metrics to a variety of monitoring systems +via the [Micrometer](https://micrometer.io/) vendor-neutral application metrics +facade. + +Important runtime metrics have been instrumented via the Micrometer API, and +all a user needs to do is implement `org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin` +in order to instantiate and configure a `io.micrometer.core.instrument.MeterRegistry` +implementation. Relevant implementations of `MeterRegistry` are available from +the [Micrometer code-base](https://github.com/micrometer-metrics/micrometer/tree/master/implementations). + +This is a simple interface: + +```java +public interface ActiveMQMetricsPlugin extends Serializable { + + ActiveMQMetricsPlugin init(Map options); + + MeterRegistry getRegistry(); +} +``` + +When the broker starts it will call `init` and pass in the `options` which can +be specified in XML as key/value properties. At this point the plugin should +instantiate and configure the `io.micrometer.core.instrument.MeterRegistry` +implementation. + +Later during the broker startup process it will call `getRegistry` in order to +get the `MeterRegistry` implementation and use it for registering meters. + +The broker ships with two `ActiveMQMetricsPlugin` implementations: + +- `org.apache.activemq.artemis.core.server.metrics.plugins.LoggingMetricsPlugin` + This plugin simply logs metrics. It's not very useful for production, but can + serve as a demonstration of the Micrometer integration. It takes no key/value + properties for configuration. + +- `org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin` + This plugin is used for testing. It is in-memory only and provides no external + output. It takes no key/value properties for configuration. + +## Metrics + +The following metrics are exported, categorized by component. A description for +each metric is exported along with the metric itself therefore the description +will not be repeated here. + +**Broker** + +- connection.count +- total.connection.count +- address.memory.usage + +**Address** + +- routed.message.count +- unrouted.message.count + +**Queue** + +- message.count +- durable.message.count +- persistent.size +- durable.persistent.size +- delivering.message.count +- delivering.durable.message.count +- delivering.persistent.size +- delivering.durable.persistent.size +- scheduled.message.count +- scheduled.durable.message.count +- scheduled.persistent.size +- scheduled.durable.persistent.size +- messages.acknowledged +- messages.added +- messages.killed +- messages.expired +- consumer.count + +It may appear that some higher level broker metrics are missing (e.g. total +message count). However, these metrics can be deduced by aggregating the +lower level metrics (e.g. aggregate the message.count metrics from all queues +to get the total). + +JVM memory metrics are exported as well. + +## Configuration + +In `broker.xml` use the `metrics-plugin` element and specify the `class-name` +attribute to configure your plugin, e.g.: + +```xml + +``` + +As noted, the plugin can also be configured with key/value properties in order +to customize its behavior as necessary, e.g.: + +```xml + + + + + +``` \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5c055a0204..f235b2c9e7 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,7 @@ 0.7.9 0.7.9 2.4 + 1.1.4 2.4.3 @@ -703,6 +704,15 @@ 2.7.2 + + + io.micrometer + micrometer-core + ${version.micrometer} + true + + + org.apache.openwebbeans openwebbeans-impl diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MetricsPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MetricsPluginTest.java new file mode 100644 index 0000000000..4a08032730 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MetricsPluginTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.plugin; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import io.micrometer.core.instrument.Measurement; +import io.micrometer.core.instrument.Meter; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin; +import org.apache.activemq.artemis.junit.Wait; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Before; +import org.junit.Test; + +public class MetricsPluginTest extends ActiveMQTestBase { + + protected ActiveMQServer server; + protected ClientSession session; + protected ClientSessionFactory sf; + protected ServerLocator locator; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server = createServer(false, createDefaultInVMConfig()); + server.getConfiguration().setMetricsPlugin(new SimpleMetricsPlugin().init(null)); + server.start(); + locator = createInVMNonHALocator(); + sf = createSessionFactory(locator); + session = addClientSession(sf.createSession(false, true, true)); + } + + @Test + public void testForBasicMetricsPresenceAndValue() throws Exception { + final String data = "Simple Text " + UUID.randomUUID().toString(); + final String queueName = "simpleQueue"; + final String addressName = "simpleAddress"; + + session.createQueue(addressName, RoutingType.ANYCAST, queueName, null, true); + ClientProducer producer = session.createProducer(addressName); + ClientMessage message = session.createMessage(true); + message.getBodyBuffer().writeString(data); + producer.send(message); + producer.close(); + + Map metrics = getMetrics(); + + checkMetric(metrics, "'artemis.message.count'", queueName, 1.0); + checkMetric(metrics, "'artemis.messages.added'", queueName, 1.0); + checkMetric(metrics, "'artemis.messages.acknowledged'", queueName, 0.0); + checkMetric(metrics, "'artemis.durable.message.count'", queueName, 1.0); + checkMetric(metrics, "'artemis.delivering.message.count'", queueName, 0.0); + checkMetric(metrics, "'artemis.routed.message.count'", addressName, 1.0); + checkMetric(metrics, "'artemis.unrouted.message.count'", addressName, 0.0); + checkMetric(metrics, "'artemis.consumer.count'", queueName, 0.0); + + ClientConsumer consumer = session.createConsumer(queueName); + session.start(); + message = consumer.receive(1000); + assertNotNull(message); + + metrics = getMetrics(); + checkMetric(metrics, "'artemis.delivering.message.count'", queueName, 1.0); + checkMetric(metrics, "'artemis.consumer.count'", queueName, 1.0); + + message.acknowledge(); + assertEquals(data, message.getBodyBuffer().readString()); + session.commit(); // force the ack to be committed + + assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)).getMessagesAcknowledged() == 1, 1000, 100)); + + consumer.close(); + + metrics = getMetrics(); + + checkMetric(metrics, "'artemis.message.count'", queueName, 0.0); + checkMetric(metrics, "'artemis.messages.added'", queueName, 1.0); + checkMetric(metrics, "'artemis.messages.acknowledged'", queueName, 1.0); + checkMetric(metrics, "'artemis.durable.message.count'", queueName, 0.0); + checkMetric(metrics, "'artemis.delivering.message.count'", queueName, 0.0); + checkMetric(metrics, "'artemis.routed.message.count'", addressName, 1.0); + checkMetric(metrics, "'artemis.unrouted.message.count'", addressName, 0.0); + checkMetric(metrics, "'artemis.consumer.count'", queueName, 0.0); + } + + public Map getMetrics() { + Map metrics = new HashMap<>(); + List meters = server.getMetricsManager().getMeterRegistry().getMeters(); + assertTrue(meters.size() > 0); + for (Meter meter : meters) { + Iterable measurements = meter.measure(); + for (Measurement measurement : measurements) { + metrics.put(meter.getId().toString(), measurement.getValue()); + // if (meter.getId().getName().startsWith("artemis")) { + // IntegrationTestLogger.LOGGER.info(meter.getId().toString() + ": " + measurement.getValue() + " (" + meter.getId().getDescription() + ")"); + // } + } + } + return metrics; + } + + public void checkMetric(Map metrics, String metric, String tag, Double value) { + boolean contains = false; + for (Map.Entry entry : metrics.entrySet()) { + if (entry.getKey().contains(metric) && entry.getKey().contains(tag)) { + contains = true; + assertEquals(metric + " not equal", value, entry.getValue(), 0); + break; + } + } + assertTrue(contains); + } +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java index 6b891af7da..a809eb5dde 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java @@ -49,7 +49,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { @Test public void testUnitOnWildCardFailingScenario() throws Exception { int errors = 0; - WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null); + WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null); ad.addBinding(new BindingFake("Topic1", "Topic1")); ad.addBinding(new BindingFake("Topic1", "one")); ad.addBinding(new BindingFake("*", "two")); @@ -79,7 +79,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { @Test public void testUnitOnWildCardFailingScenarioFQQN() throws Exception { int errors = 0; - WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null); + WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null); ad.addBinding(new BindingFake("Topic1", "Topic1")); ad.addBinding(new BindingFake("Topic1", "one")); ad.addBinding(new BindingFake("*", "two")); @@ -114,7 +114,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { @Test public void testWildCardAddressRemoval() throws Exception { - WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null); + WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null); ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST)); ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.#"), RoutingType.MULTICAST)); ad.addBinding(new BindingFake("Topic1.topic", "two")); @@ -142,7 +142,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { final WildcardConfiguration configuration = new WildcardConfiguration(); configuration.setAnyWords('>'); - WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null); + WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null, null); ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.>"), RoutingType.MULTICAST)); ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test"), RoutingType.MULTICAST)); ad.addBinding(new BindingFake("Topic1.>", "one")); @@ -171,7 +171,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { final WildcardConfiguration configuration = new WildcardConfiguration(); configuration.setAnyWords('>'); - WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null); + WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null, null); ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.>"), RoutingType.MULTICAST)); ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test"), RoutingType.MULTICAST)); ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test.test1"), RoutingType.MULTICAST));