ARTEMIS-2308 Support exporting metrics

This commit is contained in:
Justin Bertram 2019-05-23 12:48:14 -04:00 committed by Clebert Suconic
parent 7fa1b13cc4
commit 5768f6e2f3
35 changed files with 966 additions and 58 deletions

View File

@ -25,6 +25,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException
* An ActiveMQServerControl is used to manage ActiveMQ Artemis servers.
*/
public interface ActiveMQServerControl {
String CONNECTION_COUNT_DESCRIPTION = "Number of clients connected to this server";
String TOTAL_CONNECTION_COUNT_DESCRIPTION = "Number of clients which have connected to this server since it was started";
String ADDRESS_MEMORY_USAGE_DESCRIPTION = "Memory used by all the addresses on broker for in-memory messages";
/**
* Returns this server's version.
@ -35,13 +38,13 @@ public interface ActiveMQServerControl {
/**
* Returns the number of clients connected to this server.
*/
@Attribute(desc = "Number of clients connected to this server")
@Attribute(desc = CONNECTION_COUNT_DESCRIPTION)
int getConnectionCount();
/**
* Returns the number of clients which have connected to this server since it was started.
*/
@Attribute(desc = "Number of clients which have connected to this server since it was started")
@Attribute(desc = TOTAL_CONNECTION_COUNT_DESCRIPTION)
long getTotalConnectionCount();
/**
@ -435,7 +438,7 @@ public interface ActiveMQServerControl {
/**
* Returns the memory used by all the addresses on broker for in-memory messages
*/
@Attribute(desc = "Memory used by all the addresses on broker for in-memory messages")
@Attribute(desc = ADDRESS_MEMORY_USAGE_DESCRIPTION)
long getAddressMemoryUsage();
/**

View File

@ -23,6 +23,8 @@ import java.util.Map;
* An AddressControl is used to manage an address.
*/
public interface AddressControl {
String ROUTED_MESSAGE_COUNT_DESCRIPTION = "number of messages routed to one or more bindings";
String UNROUTED_MESSAGE_COUNT_DESCRIPTION = "number of messages not routed to any bindings";
/**
* Returns the managed address.
@ -101,19 +103,19 @@ public interface AddressControl {
@Attribute(desc = "names of all bindings (both queues and diverts) bound to this address")
String[] getBindingNames() throws Exception;
@Attribute(desc = "number of messages added to all the queues for this address")
@Attribute(desc = "number of messages currently in all queues bound to this address (includes scheduled, paged, and in-delivery messages)")
long getMessageCount();
/**
* Returns the number of messages routed to one or more bindings
*/
@Attribute(desc = "number of messages routed to one or more bindings")
@Attribute(desc = ROUTED_MESSAGE_COUNT_DESCRIPTION)
long getRoutedMessageCount();
/**
* Returns the number of messages not routed to any bindings
*/
@Attribute(desc = "number of messages not routed to any bindings")
@Attribute(desc = UNROUTED_MESSAGE_COUNT_DESCRIPTION)
long getUnRoutedMessageCount();

View File

@ -24,7 +24,26 @@ import java.util.Map;
* A QueueControl is used to manage a queue.
*/
public interface QueueControl {
// Attributes ----------------------------------------------------
String MESSAGE_COUNT_DESCRIPTION = "number of messages currently in this queue (includes scheduled, paged, and in-delivery messages)";
String DURABLE_MESSAGE_COUNT_DESCRIPTION = "number of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)";
String PERSISTENT_SIZE_DESCRIPTION = "persistent size of all messages (including durable and non-durable) currently in this queue (includes scheduled, paged, and in-delivery messages)";
String DURABLE_PERSISTENT_SIZE_DESCRIPTION = "persistent size of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)";
String SCHEDULED_MESSAGE_COUNT_DESCRIPTION = "number of scheduled messages in this queue";
String DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION = "number of durable scheduled messages in this queue";
String SCHEDULED_SIZE_DESCRIPTION = "persistent size of scheduled messages in this queue";
String DURABLE_SCHEDULED_SIZE_DESCRIPTION = "persistent size of durable scheduled messages in this queue";
String DELIVERING_MESSAGE_COUNT_DESCRIPTION = "number of messages that this queue is currently delivering to its consumers";
String DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION = "number of durable messages that this queue is currently delivering to its consumers";
String DELIVERING_SIZE_DESCRIPTION = "persistent size of messages that this queue is currently delivering to its consumers";
String DURABLE_DELIVERING_SIZE_DESCRIPTION = "persistent size of durable messages that this queue is currently delivering to its consumers";
String CONSUMER_COUNT_DESCRIPTION = "number of consumers consuming messages from this queue";
String MESSAGES_ADDED_DESCRIPTION = "number of messages added to this queue since it was created";
String MESSAGES_ACKNOWLEDGED_DESCRIPTION = "number of messages acknowledged from this queue since it was created";
String MESSAGES_EXPIRED_DESCRIPTION = "number of messages expired from this queue since it was created";
String MESSAGES_KILLED_DESCRIPTION = "number of messages removed from this queue since it was created due to exceeding the max delivery attempts";
/**
* Returns the name of this queue.
@ -77,7 +96,7 @@ public interface QueueControl {
/**
* Returns the number of messages currently in this queue.
*/
@Attribute(desc = "number of messages currently in this queue (includes scheduled, paged, and in-delivery messages)")
@Attribute(desc = MESSAGE_COUNT_DESCRIPTION)
long getMessageCount();
/**
@ -85,13 +104,13 @@ public interface QueueControl {
* is the amount of space the message would take up on disk which is used to track how much data there
* is to consume on this queue
*/
@Attribute(desc = "persistent size of all messages (including durable and non-durable) currently in this queue (includes scheduled, paged, and in-delivery messages)")
@Attribute(desc = PERSISTENT_SIZE_DESCRIPTION)
long getPersistentSize();
/**
* Returns the number of durable messages currently in this queue.
*/
@Attribute(desc = "number of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)")
@Attribute(desc = DURABLE_MESSAGE_COUNT_DESCRIPTION)
long getDurableMessageCount();
/**
@ -99,73 +118,73 @@ public interface QueueControl {
* is the amount of space the message would take up on disk which is used to track how much data there
* is to consume on this queue
*/
@Attribute(desc = "persistent size of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)")
@Attribute(desc = DURABLE_PERSISTENT_SIZE_DESCRIPTION)
long getDurablePersistentSize();
/**
* Returns the number of scheduled messages in this queue.
*/
@Attribute(desc = "number of scheduled messages in this queue")
@Attribute(desc = SCHEDULED_MESSAGE_COUNT_DESCRIPTION)
long getScheduledCount();
/**
* Returns the size of scheduled messages in this queue.
*/
@Attribute(desc = "persistent size of scheduled messages in this queue")
@Attribute(desc = SCHEDULED_SIZE_DESCRIPTION)
long getScheduledSize();
/**
* Returns the number of durable scheduled messages in this queue.
*/
@Attribute(desc = "number of durable scheduled messages in this queue")
@Attribute(desc = DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION)
long getDurableScheduledCount();
/**
* Returns the size of durable scheduled messages in this queue.
*/
@Attribute(desc = "persistent size of durable scheduled messages in this queue")
@Attribute(desc = DURABLE_SCHEDULED_SIZE_DESCRIPTION)
long getDurableScheduledSize();
/**
* Returns the number of consumers consuming messages from this queue.
*/
@Attribute(desc = "number of consumers consuming messages from this queue")
@Attribute(desc = CONSUMER_COUNT_DESCRIPTION)
int getConsumerCount();
/**
* Returns the number of messages that this queue is currently delivering to its consumers.
*/
@Attribute(desc = "number of messages that this queue is currently delivering to its consumers")
@Attribute(desc = DELIVERING_MESSAGE_COUNT_DESCRIPTION)
int getDeliveringCount();
/**
* Returns the persistent size of messages that this queue is currently delivering to its consumers.
*/
@Attribute(desc = "persistent size of messages that this queue is currently delivering to its consumers")
@Attribute(desc = DELIVERING_SIZE_DESCRIPTION)
long getDeliveringSize();
/**
* Returns the number of durable messages that this queue is currently delivering to its consumers.
*/
@Attribute(desc = "number of durable messages that this queue is currently delivering to its consumers")
@Attribute(desc = DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION)
int getDurableDeliveringCount();
/**
* Returns the size of durable messages that this queue is currently delivering to its consumers.
*/
@Attribute(desc = "persistent size of durable messages that this queue is currently delivering to its consumers")
@Attribute(desc = DURABLE_DELIVERING_SIZE_DESCRIPTION)
long getDurableDeliveringSize();
/**
* Returns the number of messages added to this queue since it was created.
*/
@Attribute(desc = "number of messages added to this queue since it was created")
@Attribute(desc = MESSAGES_ADDED_DESCRIPTION)
long getMessagesAdded();
/**
* Returns the number of messages added to this queue since it was created.
*/
@Attribute(desc = "number of messages acknowledged from this queue since it was created")
@Attribute(desc = MESSAGES_ACKNOWLEDGED_DESCRIPTION)
long getMessagesAcknowledged();
/**
@ -178,13 +197,13 @@ public interface QueueControl {
/**
* Returns the number of messages expired from this queue since it was created.
*/
@Attribute(desc = "number of messages expired from this queue since it was created")
@Attribute(desc = MESSAGES_EXPIRED_DESCRIPTION)
long getMessagesExpired();
/**
* Returns the number of messages removed from this queue since it was created due to exceeding the max delivery attempts.
*/
@Attribute(desc = "number of messages removed from this queue since it was created due to exceeding the max delivery attempts")
@Attribute(desc = MESSAGES_KILLED_DESCRIPTION)
long getMessagesKilled();
/**

View File

@ -70,6 +70,7 @@
<include>org.apache.activemq:artemis-web</include>
<include>org.apache.activemq.rest:artemis-rest</include>
<include>org.apache.qpid:qpid-jms-client</include>
<include>io.micrometer:micrometer-core</include>
<!-- dependencies -->
<include>org.apache.geronimo.specs:geronimo-jms_2.0_spec</include>

View File

@ -65,6 +65,8 @@
<bundle dependency="true">mvn:org.apache.commons/commons-configuration2/${commons.config.version}</bundle>
<bundle dependency="true">mvn:org.apache.commons/commons-text/1.6</bundle>
<bundle dependency="true">mvn:org.apache.commons/commons-lang3/${commons.lang.version}</bundle>
<!-- Micrometer can't be included until it supports OSGi. It is currently an "optional" Maven dependency. -->
<!--bundle dependency="true">mvn:io.micrometer/micrometer-core/${version.micrometer}</bundle-->
<bundle>mvn:org.apache.activemq/activemq-artemis-native/${activemq-artemis-native-version}</bundle>
<bundle>mvn:org.apache.activemq/artemis-server-osgi/${pom.version}</bundle>

View File

@ -131,6 +131,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
@ -1013,6 +1014,8 @@ public interface Configuration {
Configuration addSecuritySettingPlugin(SecuritySettingPlugin plugin);
Configuration setMetricsPlugin(ActiveMQMetricsPlugin plugin);
/**
* @return list of {@link ConnectorServiceConfiguration}
*/
@ -1020,6 +1023,8 @@ public interface Configuration {
List<SecuritySettingPlugin> getSecuritySettingPlugins();
ActiveMQMetricsPlugin getMetricsPlugin();
/**
* The default password decoder
*/

View File

@ -43,6 +43,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
@ -260,6 +261,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private List<SecuritySettingPlugin> securitySettingPlugins = new ArrayList<>();
private ActiveMQMetricsPlugin metricsPlugin = null;
private final List<ActiveMQServerBasePlugin> brokerPlugins = new CopyOnWriteArrayList<>();
private final List<ActiveMQServerConnectionPlugin> brokerConnectionPlugins = new CopyOnWriteArrayList<>();
private final List<ActiveMQServerSessionPlugin> brokerSessionPlugins = new CopyOnWriteArrayList<>();
@ -1423,6 +1426,11 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this.securitySettingPlugins;
}
@Override
public ActiveMQMetricsPlugin getMetricsPlugin() {
return this.metricsPlugin;
}
@Override
public void registerBrokerPlugins(final List<ActiveMQServerBasePlugin> plugins) {
plugins.forEach(plugin -> registerBrokerPlugin(plugin));
@ -1632,6 +1640,12 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public ConfigurationImpl setMetricsPlugin(final ActiveMQMetricsPlugin plugin) {
this.metricsPlugin = plugin;
return this;
}
@Override
public Boolean isMaskPassword() {
return maskPassword;

View File

@ -78,6 +78,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -553,7 +554,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
parseDivertConfiguration(dvNode, config);
}
// Persistence config
config.setLargeMessagesDirectory(getString(e, "large-messages-directory", config.getLargeMessagesDirectory(), Validators.NOT_NULL_OR_EMPTY));
@ -677,6 +677,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
parseBrokerPlugins(e, config);
NodeList metricsPlugin = e.getElementsByTagName("metrics-plugin");
if (metricsPlugin.getLength() != 0) {
parseMetricsPlugin(metricsPlugin.item(0), config);
}
NodeList connectorServiceConfigs = e.getElementsByTagName("connector-service");
ArrayList<ConnectorServiceConfiguration> configs = new ArrayList<>();
@ -767,6 +773,23 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
return properties;
}
private ActiveMQMetricsPlugin parseMetricsPlugin(final Node item, final Configuration config) {
final String clazz = item.getAttributes().getNamedItem("class-name").getNodeValue();
Map<String, String> properties = getMapOfChildPropertyElements(item);
ActiveMQMetricsPlugin metricsPlugin = AccessController.doPrivileged(new PrivilegedAction<ActiveMQMetricsPlugin>() {
@Override
public ActiveMQMetricsPlugin run() {
return (ActiveMQMetricsPlugin) ClassloadingUtil.newInstanceFromClassLoader(FileConfigurationParser.class, clazz);
}
});
config.setMetricsPlugin(metricsPlugin.init(properties));
return metricsPlugin;
}
/**
* @param e
* @param config

View File

@ -163,9 +163,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
this.addressQueueReaperPeriod = addressQueueReaperPeriod;
if (wildcardConfiguration.isRoutingEnabled()) {
addressManager = new WildcardAddressManager(this, wildcardConfiguration, storageManager);
addressManager = new WildcardAddressManager(this, wildcardConfiguration, storageManager, server.getMetricsManager());
} else {
addressManager = new SimpleAddressManager(this, wildcardConfiguration, storageManager);
addressManager = new SimpleAddressManager(this, wildcardConfiguration, storageManager, server.getMetricsManager());
}
this.idCacheSize = idCacheSize;

View File

@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.jboss.logging.Logger;
@ -65,18 +66,23 @@ public class SimpleAddressManager implements AddressManager {
private final BindingsFactory bindingsFactory;
protected final MetricsManager metricsManager;
protected final WildcardConfiguration wildcardConfiguration;
public SimpleAddressManager(final BindingsFactory bindingsFactory, final StorageManager storageManager) {
this(bindingsFactory, new WildcardConfiguration(), storageManager);
public SimpleAddressManager(final BindingsFactory bindingsFactory, final StorageManager storageManager,
final MetricsManager metricsManager) {
this(bindingsFactory, new WildcardConfiguration(), storageManager, metricsManager);
}
public SimpleAddressManager(final BindingsFactory bindingsFactory,
final WildcardConfiguration wildcardConfiguration,
final StorageManager storageManager) {
final StorageManager storageManager,
final MetricsManager metricsManager) {
this.wildcardConfiguration = wildcardConfiguration;
this.bindingsFactory = bindingsFactory;
this.storageManager = storageManager;
this.metricsManager = metricsManager;
}
@Override
@ -253,7 +259,11 @@ public class SimpleAddressManager implements AddressManager {
@Override
public boolean reloadAddressInfo(AddressInfo addressInfo) throws Exception {
return addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo) == null;
boolean added = addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo) == null;
if (added) {
addressInfo.registerMeters(metricsManager);
}
return added;
}
@Override
@ -345,7 +355,13 @@ public class SimpleAddressManager implements AddressManager {
@Override
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
return addressInfoMap.remove(CompositeAddress.extractAddressName(address));
final AddressInfo removed = addressInfoMap.remove(CompositeAddress.extractAddressName(address));
if (removed != null) {
removed.unregisterMeters(metricsManager);
}
return removed;
}
@Override

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import java.util.Collection;
@ -45,13 +46,17 @@ public class WildcardAddressManager extends SimpleAddressManager {
private final Map<SimpleString, Address> wildCardAddresses = new ConcurrentHashMap<>();
public WildcardAddressManager(final BindingsFactory bindingsFactory, final WildcardConfiguration wildcardConfiguration, final
StorageManager storageManager) {
super(bindingsFactory, wildcardConfiguration, storageManager);
public WildcardAddressManager(final BindingsFactory bindingsFactory,
final WildcardConfiguration wildcardConfiguration,
final StorageManager storageManager,
final MetricsManager metricsManager) {
super(bindingsFactory, wildcardConfiguration, storageManager, metricsManager);
}
public WildcardAddressManager(final BindingsFactory bindingsFactory, StorageManager storageManager) {
super(bindingsFactory, storageManager);
public WildcardAddressManager(final BindingsFactory bindingsFactory,
final StorageManager storageManager,
final MetricsManager metricsManager) {
super(bindingsFactory, storageManager, metricsManager);
}
@Override
@ -155,6 +160,7 @@ public class WildcardAddressManager extends SimpleAddressManager {
//Remove from mappings so removeAndUpdateAddressMap processes and cleanup
mappings.remove(address);
removeAndUpdateAddressMap(new AddressImpl(removed.getName(), wildcardConfiguration));
removed.unregisterMeters(metricsManager);
}
return removed;
}

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ConnectorsService;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
@ -345,6 +346,8 @@ public interface ActiveMQServer extends ServiceComponent {
ResourceManager getResourceManager();
MetricsManager getMetricsManager();
List<ServerSession> getSessions(String connectionID);
/**

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.MBeanServer;
import java.io.File;
import java.io.IOException;
@ -47,6 +46,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -57,6 +57,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
@ -64,6 +66,7 @@ import org.apache.activemq.artemis.core.config.ConfigurationUtils;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
@ -141,7 +144,7 @@ import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.cluster.BackupManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.server.federation.FederationManager;
import org.apache.activemq.artemis.core.server.files.FileMoveManager;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
@ -151,6 +154,8 @@ import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
@ -162,7 +167,6 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlug
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.server.federation.FederationManager;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
@ -266,6 +270,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private volatile ResourceManager resourceManager;
private volatile MetricsManager metricsManager;
private volatile ActiveMQServerControlImpl messagingServerControl;
private volatile ClusterManager clusterManager;
@ -1157,6 +1163,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
stopComponent(managementService);
unregisterMeters();
stopComponent(resourceManager);
stopComponent(postOffice);
@ -1462,6 +1469,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return resourceManager;
}
@Override
public MetricsManager getMetricsManager() {
return metricsManager;
}
@Override
public Version getVersion() {
return version;
@ -2716,6 +2728,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
pagingManager = createPagingManager();
resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool);
/**
* If there is no plugin configured we don't want to instantiate a MetricsManager. This keeps the dependency
* on Micrometer as "optional" in the Maven pom.xml. This is particularly beneficial because optional dependencies
* are not required to be included in the OSGi bundle and the Micrometer jars apparently don't support OSGi.
*/
if (configuration.getMetricsPlugin() != null) {
metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsPlugin());
}
registerMeters();
postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getAddressQueueScanPeriod(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository);
// This can't be created until node id is set
@ -2899,6 +2923,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callActivationCompleteCallbacks();
}
private void registerMeters() {
MetricsManager metricsManager = this.metricsManager; // volatile load
if (metricsManager != null) {
metricsManager.registerBrokerGauge(builder -> {
builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(getPagingManager().getGlobalSize()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
});
}
}
private void unregisterMeters() {
MetricsManager metricsManager = this.metricsManager; // volatile load
if (metricsManager != null) {
metricsManager.remove(ResourceNames.BROKER + "." + configuration.getName());
}
}
private void deploySecurityFromConfiguration() {
for (Map.Entry<String, Set<Role>> entry : configuration.getSecurityRoles().entrySet()) {
securityRepository.addMatch(entry.getKey(), entry.getValue(), true);

View File

@ -16,15 +16,19 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.PrefixUtil;
import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.server.metrics.AddressMetricNames;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.PrefixUtil;
public class AddressInfo {
private long id;
@ -191,4 +195,18 @@ public class AddressInfo {
return unRoutedMessageCountUpdater.get(this);
}
public void registerMeters(MetricsManager metricsManager) {
if (metricsManager != null) {
metricsManager.registerAddressGauge(name.toString(), builder -> {
builder.register(AddressMetricNames.ROUTED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getRoutedMessageCount()), AddressControl.ROUTED_MESSAGE_COUNT_DESCRIPTION);
builder.register(AddressMetricNames.UNROUTED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getUnRoutedMessageCount()), AddressControl.UNROUTED_MESSAGE_COUNT_DESCRIPTION);
});
}
}
public void unregisterMeters(MetricsManager metricsManager) {
if (metricsManager != null) {
metricsManager.remove(ResourceNames.ADDRESS + name);
}
}
}

View File

@ -50,6 +50,9 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.PriorityAware;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
@ -71,7 +74,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.PriorityAware;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingContext;
@ -81,6 +83,8 @@ import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.metrics.QueueMetricNames;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -572,6 +576,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.user = user;
this.factory = factory;
registerMeters();
}
// Bindable implementation -------------------------------------------------------------------------------------
@ -1966,6 +1972,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
slowConsumerReaperFuture.cancel(false);
}
unregisterMeters();
tx.commit();
} catch (Exception e) {
tx.rollback();
@ -3852,6 +3860,44 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
private void registerMeters() {
String addressName = address.toString();
String queueName = name.toString();
if (server != null && server.getMetricsManager() != null) {
MetricsManager metricsManager = server.getMetricsManager();
metricsManager.registerQueueGauge(queueName, addressName, (builder) -> {
builder.register(QueueMetricNames.MESSAGE_COUNT, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.PERSISTENT_SIZE, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, pendingMetrics, metrics -> Double.valueOf(pendingMetrics.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getMessageCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurableMessageCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurableMessageCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, deliveringMetrics, metrics -> Double.valueOf(deliveringMetrics.getDurablePersistentSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION);
});
}
}
private void unregisterMeters() {
if (server != null && server.getMetricsManager() != null) {
server.getMetricsManager().remove(ResourceNames.QUEUE + name);
}
}
private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener {
@Override

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.metrics;
import java.io.Serializable;
import java.util.Map;
import io.micrometer.core.instrument.MeterRegistry;
public interface ActiveMQMetricsPlugin extends Serializable {
ActiveMQMetricsPlugin init(Map<String, String> options);
MeterRegistry getRegistry();
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.metrics;
public class AddressMetricNames {
public static final String ROUTED_MESSAGE_COUNT = "routed.message.count";
public static final String UNROUTED_MESSAGE_COUNT = "unrouted.message.count";
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.metrics;
public class BrokerMetricNames {
public static final String CONNECTION_COUNT = "connection.count";
public static final String TOTAL_CONNECTION_COUNT = "total.connection.count";
public static final String ADDRESS_MEMORY_USAGE = "address.memory.usage";
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.metrics;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.ToDoubleFunction;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
public class MetricsManager {
private final String brokerName;
private final MeterRegistry meterRegistry;
private final Map<String, List<Meter>> meters = new ConcurrentHashMap<>();
public MetricsManager(String brokerName, ActiveMQMetricsPlugin metricsPlugin) {
this.brokerName = brokerName;
meterRegistry = metricsPlugin.getRegistry();
Metrics.globalRegistry.add(meterRegistry);
new JvmMemoryMetrics().bindTo(meterRegistry);
}
public MeterRegistry getMeterRegistry() {
return meterRegistry;
}
@FunctionalInterface
public interface MetricGaugeBuilder {
void register(String metricName, Object state, ToDoubleFunction f, String description);
}
public void registerQueueGauge(String address, String queue, Consumer<MetricGaugeBuilder> builder) {
final MeterRegistry meterRegistry = this.meterRegistry;
if (meterRegistry == null) {
return;
}
final List<Gauge.Builder> newMeters = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
.builder("artemis." + metricName, state, f)
.tag("broker", brokerName)
.tag("address", address)
.tag("queue", queue)
.description(description);
newMeters.add(meter);
});
final String resource = ResourceNames.QUEUE + queue;
this.meters.compute(resource, (s, meters) -> {
//the old meters are ignored on purpose
meters = new ArrayList<>(newMeters.size());
for (Gauge.Builder gauge : newMeters) {
meters.add(gauge.register(meterRegistry));
}
return meters;
});
}
public void registerAddressGauge(String address, Consumer<MetricGaugeBuilder> builder) {
final MeterRegistry meterRegistry = this.meterRegistry;
if (meterRegistry == null) {
return;
}
final List<Gauge.Builder> newMeters = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
.builder("artemis." + metricName, state, f)
.tag("broker", brokerName)
.tag("address", address)
.description(description);
newMeters.add(meter);
});
final String resource = ResourceNames.ADDRESS + address;
this.meters.compute(resource, (s, meters) -> {
//the old meters are ignored on purpose
meters = new ArrayList<>(newMeters.size());
for (Gauge.Builder gauge : newMeters) {
meters.add(gauge.register(meterRegistry));
}
return meters;
});
}
public void registerBrokerGauge(Consumer<MetricGaugeBuilder> builder) {
final MeterRegistry meterRegistry = this.meterRegistry;
if (meterRegistry == null) {
return;
}
final List<Gauge.Builder> newMeters = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
.builder("artemis." + metricName, state, f)
.tag("broker", brokerName)
.description(description);
newMeters.add(meter);
});
final String resource = ResourceNames.BROKER + "." + brokerName;
this.meters.compute(resource, (s, meters) -> {
//the old meters are ignored on purpose
meters = new ArrayList<>(newMeters.size());
for (Gauge.Builder gauge : newMeters) {
meters.add(gauge.register(meterRegistry));
}
return meters;
});
}
public void remove(String component) {
meters.computeIfPresent(component, (s, meters) -> {
if (meters == null) {
return null;
}
for (Meter meter : meters) {
Meter removed = meterRegistry.remove(meter);
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Removed meter: " + removed.getId());
}
}
return null;
});
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.metrics;
public class QueueMetricNames {
public static final String MESSAGE_COUNT = "message.count";
public static final String DURABLE_MESSAGE_COUNT = "durable.message.count";
public static final String PERSISTENT_SIZE = "persistent.size";
public static final String DURABLE_PERSISTENT_SIZE = "durable.persistent.size";
public static final String DELIVERING_MESSAGE_COUNT = "delivering.message.count";
public static final String DELIVERING_DURABLE_MESSAGE_COUNT = "delivering.durable.message.count";
public static final String DELIVERING_PERSISTENT_SIZE = "delivering.persistent_size";
public static final String DELIVERING_DURABLE_PERSISTENT_SIZE = "delivering.durable.persistent.size";
public static final String SCHEDULED_MESSAGE_COUNT = "scheduled.message.count";
public static final String SCHEDULED_DURABLE_MESSAGE_COUNT = "scheduled.durable.message.count";
public static final String SCHEDULED_PERSISTENT_SIZE = "scheduled.persistent.size";
public static final String SCHEDULED_DURABLE_PERSISTENT_SIZE = "scheduled_durable.persistent.size";
public static final String MESSAGES_ACKNOWLEDGED = "messages.acknowledged";
public static final String MESSAGES_ADDED = "messages.added";
public static final String MESSAGES_KILLED = "messages.killed";
public static final String MESSAGES_EXPIRED = "messages.expired";
public static final String CONSUMER_COUNT = "consumer.count";
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.metrics.plugins;
import java.util.Map;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.logging.LoggingMeterRegistry;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
public class LoggingMetricsPlugin implements ActiveMQMetricsPlugin {
private transient MeterRegistry meterRegistry;
@Override
public ActiveMQMetricsPlugin init(Map<String, String> options) {
this.meterRegistry = new LoggingMeterRegistry();
return this;
}
@Override
public MeterRegistry getRegistry() {
return meterRegistry;
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.metrics.plugins;
import java.util.Map;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
/**
* A very simple metrics plugin used for testing
*/
public class SimpleMetricsPlugin implements ActiveMQMetricsPlugin {
private transient MeterRegistry meterRegistry;
private Map<String, String> options;
@Override
public ActiveMQMetricsPlugin init(Map<String, String> options) {
this.meterRegistry = new SimpleMeterRegistry();
this.options = options;
return this;
}
@Override
public MeterRegistry getRegistry() {
return meterRegistry;
}
public Map<String, String> getOptions() {
return options;
}
}

View File

@ -1020,6 +1020,33 @@
</xsd:complexType>
</xsd:element>
<xsd:element name="metrics-plugin" maxOccurs="1" minOccurs="0">
<xsd:complexType>
<xsd:annotation>
<xsd:documentation>
a metrics plugin
</xsd:documentation>
</xsd:annotation>
<xsd:sequence>
<xsd:element ref="property" maxOccurs="unbounded" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
properties to configure a plugin
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
<xsd:attribute name="class-name" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
the name of the metrics plugin class to instantiate
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
</xsd:element>
<xsd:element name="address-settings" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -29,12 +29,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -50,15 +44,23 @@ import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -434,6 +436,13 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(CriticalAnalyzerPolicy.HALT, conf.getCriticalAnalyzerPolicy());
assertEquals(false, conf.isJournalDatasync());
ActiveMQMetricsPlugin metricsPlugin = conf.getMetricsPlugin();
assertTrue(metricsPlugin instanceof SimpleMetricsPlugin);
Map<String, String> options = ((SimpleMetricsPlugin) metricsPlugin).getOptions();
assertEquals("x", options.get("foo"));
assertEquals("y", options.get("bar"));
assertEquals("z", options.get("baz"));
}
private void verifyAddresses() {

View File

@ -280,6 +280,12 @@
</federation>
</federations>
<metrics-plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin">
<property key="foo" value="x"/>
<property key="bar" value="y"/>
<property key="baz" value="z"/>
</metrics-plugin>
<ha-policy>
<!--only one of the following-->
<!--on server shutdown scale down to another live server-->

View File

@ -193,6 +193,13 @@
<discovery-group-ref discovery-group-name="dg1"/>
</bridge>
</bridges>
<metrics-plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin">
<property key="foo" value="x"/>
<property key="bar" value="y"/>
<property key="baz" value="z"/>
</metrics-plugin>
<ha-policy>
<!--only one of the following-->
<!--on server shutdown scale down to another live server-->

View File

@ -1012,6 +1012,33 @@
</xsd:complexType>
</xsd:element>
<xsd:element name="metrics-plugin" maxOccurs="1" minOccurs="0">
<xsd:complexType>
<xsd:annotation>
<xsd:documentation>
a metrics plugin
</xsd:documentation>
</xsd:annotation>
<xsd:sequence>
<xsd:element ref="property" maxOccurs="unbounded" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
properties to configure a plugin
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
<xsd:attribute name="class-name" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
the name of the metrics plugin class to instantiate
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
</xsd:element>
<xsd:element name="address-settings" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -151,7 +151,8 @@ public class WebServerComponent implements ExternalComponent {
}
handlers.addHandler(homeContext);
handlers.addHandler(instanceContext);
handlers.addHandler(defaultHandler);
handlers.addHandler(defaultHandler); // this should be last
server.setHandler(handlers);
}

View File

@ -45,6 +45,7 @@
* [Extra Acknowledge Modes](pre-acknowledge.md)
* [Management](management.md)
* [Management Console](management-console.md)
* [Metrics](metrics.md)
* [Security](security.md)
* [Masking Passwords](masking-passwords.md)
* [Broker Plugins](broker-plugins.md)

View File

@ -154,6 +154,7 @@ log-delegate-factory-class-name | **deprecated** the name of the factory class t
[message-counter-sample-period](management.md#message-counters) | the sample period (in ms) to use for message counters. | 10000
[message-expiry-scan-period](message-expiry.md#configuring-the-expiry-reaper-thread) | how often (in ms) to scan for expired messages. | 30000
[message-expiry-thread-priority](message-expiry.md#configuring-the-expiry-reaper-thread)| the priority of the thread expiring messages. | 3
[metrics-plugin](metrics.md) | [a plugin to export metrics](#metrics-plugin-type) | n/a
[address-queue-scan-period](address-model.md#configuring-addresses-and-queues-via-address-settings) | how often (in ms) to scan for addresses & queues that should be removed. | 30000
name | node name; used in topology notifications if set. | n/a
[password-codec](masking-passwords.md) | the name of the class (and optional configuration properties) used to decode masked passwords. Only valid when `mask-password` is `true`. | n/a
@ -379,6 +380,14 @@ Name | Description
[class-name](broker-plugins.md#registering-a-plugin) | the name of the broker plugin class to instantiate
## metrics-plugin type
Name | Description
---|---
[property](metrics.md)| properties to configure a plugin
[class-name](metrics.md) | the name of the metrics plugin class to instantiate
## resource-limit type
Name | Description | Default

View File

@ -0,0 +1,105 @@
# Metrics
Apache ActiveMQ Artemis can export metrics to a variety of monitoring systems
via the [Micrometer](https://micrometer.io/) vendor-neutral application metrics
facade.
Important runtime metrics have been instrumented via the Micrometer API, and
all a user needs to do is implement `org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin`
in order to instantiate and configure a `io.micrometer.core.instrument.MeterRegistry`
implementation. Relevant implementations of `MeterRegistry` are available from
the [Micrometer code-base](https://github.com/micrometer-metrics/micrometer/tree/master/implementations).
This is a simple interface:
```java
public interface ActiveMQMetricsPlugin extends Serializable {
ActiveMQMetricsPlugin init(Map<String, String> options);
MeterRegistry getRegistry();
}
```
When the broker starts it will call `init` and pass in the `options` which can
be specified in XML as key/value properties. At this point the plugin should
instantiate and configure the `io.micrometer.core.instrument.MeterRegistry`
implementation.
Later during the broker startup process it will call `getRegistry` in order to
get the `MeterRegistry` implementation and use it for registering meters.
The broker ships with two `ActiveMQMetricsPlugin` implementations:
- `org.apache.activemq.artemis.core.server.metrics.plugins.LoggingMetricsPlugin`
This plugin simply logs metrics. It's not very useful for production, but can
serve as a demonstration of the Micrometer integration. It takes no key/value
properties for configuration.
- `org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin`
This plugin is used for testing. It is in-memory only and provides no external
output. It takes no key/value properties for configuration.
## Metrics
The following metrics are exported, categorized by component. A description for
each metric is exported along with the metric itself therefore the description
will not be repeated here.
**Broker**
- connection.count
- total.connection.count
- address.memory.usage
**Address**
- routed.message.count
- unrouted.message.count
**Queue**
- message.count
- durable.message.count
- persistent.size
- durable.persistent.size
- delivering.message.count
- delivering.durable.message.count
- delivering.persistent.size
- delivering.durable.persistent.size
- scheduled.message.count
- scheduled.durable.message.count
- scheduled.persistent.size
- scheduled.durable.persistent.size
- messages.acknowledged
- messages.added
- messages.killed
- messages.expired
- consumer.count
It may appear that some higher level broker metrics are missing (e.g. total
message count). However, these metrics can be deduced by aggregating the
lower level metrics (e.g. aggregate the message.count metrics from all queues
to get the total).
JVM memory metrics are exported as well.
## Configuration
In `broker.xml` use the `metrics-plugin` element and specify the `class-name`
attribute to configure your plugin, e.g.:
```xml
<metrics-plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.LoggingMetricsPlugin" />
```
As noted, the plugin can also be configured with key/value properties in order
to customize its behavior as necessary, e.g.:
```xml
<metrics-plugin class-name="org.example.MyMetricsPlugin">
<property key="host" value="example.org" />
<property key="port" value="5162" />
<property key="foo" value="10" />
</metrics-plugin>
```

10
pom.xml
View File

@ -116,6 +116,7 @@
<version.org.jacoco>0.7.9</version.org.jacoco>
<version.org.jacoco.plugin>0.7.9</version.org.jacoco.plugin>
<version.maven.jar.plugin>2.4</version.maven.jar.plugin>
<version.micrometer>1.1.4</version.micrometer>
<!-- used on tests -->
<groovy.version>2.4.3</groovy.version>
@ -703,6 +704,15 @@
<version>2.7.2</version>
</dependency>
<!-- Needed for Micrometer -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${version.micrometer}</version>
<optional>true</optional> <!-- keep optional as "true" at least until micrometer supports OSGi -->
<!-- license Apache 2 -->
</dependency>
<dependency>
<groupId>org.apache.openwebbeans</groupId>
<artifactId>openwebbeans-impl</artifactId>

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <br>
* http://www.apache.org/licenses/LICENSE-2.0
* <br>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Meter;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
public class MetricsPluginTest extends ActiveMQTestBase {
protected ActiveMQServer server;
protected ClientSession session;
protected ClientSessionFactory sf;
protected ServerLocator locator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(false, createDefaultInVMConfig());
server.getConfiguration().setMetricsPlugin(new SimpleMetricsPlugin().init(null));
server.start();
locator = createInVMNonHALocator();
sf = createSessionFactory(locator);
session = addClientSession(sf.createSession(false, true, true));
}
@Test
public void testForBasicMetricsPresenceAndValue() throws Exception {
final String data = "Simple Text " + UUID.randomUUID().toString();
final String queueName = "simpleQueue";
final String addressName = "simpleAddress";
session.createQueue(addressName, RoutingType.ANYCAST, queueName, null, true);
ClientProducer producer = session.createProducer(addressName);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString(data);
producer.send(message);
producer.close();
Map<String, Double> metrics = getMetrics();
checkMetric(metrics, "'artemis.message.count'", queueName, 1.0);
checkMetric(metrics, "'artemis.messages.added'", queueName, 1.0);
checkMetric(metrics, "'artemis.messages.acknowledged'", queueName, 0.0);
checkMetric(metrics, "'artemis.durable.message.count'", queueName, 1.0);
checkMetric(metrics, "'artemis.delivering.message.count'", queueName, 0.0);
checkMetric(metrics, "'artemis.routed.message.count'", addressName, 1.0);
checkMetric(metrics, "'artemis.unrouted.message.count'", addressName, 0.0);
checkMetric(metrics, "'artemis.consumer.count'", queueName, 0.0);
ClientConsumer consumer = session.createConsumer(queueName);
session.start();
message = consumer.receive(1000);
assertNotNull(message);
metrics = getMetrics();
checkMetric(metrics, "'artemis.delivering.message.count'", queueName, 1.0);
checkMetric(metrics, "'artemis.consumer.count'", queueName, 1.0);
message.acknowledge();
assertEquals(data, message.getBodyBuffer().readString());
session.commit(); // force the ack to be committed
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)).getMessagesAcknowledged() == 1, 1000, 100));
consumer.close();
metrics = getMetrics();
checkMetric(metrics, "'artemis.message.count'", queueName, 0.0);
checkMetric(metrics, "'artemis.messages.added'", queueName, 1.0);
checkMetric(metrics, "'artemis.messages.acknowledged'", queueName, 1.0);
checkMetric(metrics, "'artemis.durable.message.count'", queueName, 0.0);
checkMetric(metrics, "'artemis.delivering.message.count'", queueName, 0.0);
checkMetric(metrics, "'artemis.routed.message.count'", addressName, 1.0);
checkMetric(metrics, "'artemis.unrouted.message.count'", addressName, 0.0);
checkMetric(metrics, "'artemis.consumer.count'", queueName, 0.0);
}
public Map<String, Double> getMetrics() {
Map<String, Double> metrics = new HashMap<>();
List<Meter> meters = server.getMetricsManager().getMeterRegistry().getMeters();
assertTrue(meters.size() > 0);
for (Meter meter : meters) {
Iterable<Measurement> measurements = meter.measure();
for (Measurement measurement : measurements) {
metrics.put(meter.getId().toString(), measurement.getValue());
// if (meter.getId().getName().startsWith("artemis")) {
// IntegrationTestLogger.LOGGER.info(meter.getId().toString() + ": " + measurement.getValue() + " (" + meter.getId().getDescription() + ")");
// }
}
}
return metrics;
}
public void checkMetric(Map<String, Double> metrics, String metric, String tag, Double value) {
boolean contains = false;
for (Map.Entry<String, Double> entry : metrics.entrySet()) {
if (entry.getKey().contains(metric) && entry.getKey().contains(tag)) {
contains = true;
assertEquals(metric + " not equal", value, entry.getValue(), 0);
break;
}
}
assertTrue(contains);
}
}

View File

@ -49,7 +49,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
@Test
public void testUnitOnWildCardFailingScenario() throws Exception {
int errors = 0;
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null);
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
ad.addBinding(new BindingFake("Topic1", "Topic1"));
ad.addBinding(new BindingFake("Topic1", "one"));
ad.addBinding(new BindingFake("*", "two"));
@ -79,7 +79,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
@Test
public void testUnitOnWildCardFailingScenarioFQQN() throws Exception {
int errors = 0;
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null);
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
ad.addBinding(new BindingFake("Topic1", "Topic1"));
ad.addBinding(new BindingFake("Topic1", "one"));
ad.addBinding(new BindingFake("*", "two"));
@ -114,7 +114,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
@Test
public void testWildCardAddressRemoval() throws Exception {
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null);
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), null, null);
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Queue1.#"), RoutingType.ANYCAST));
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.#"), RoutingType.MULTICAST));
ad.addBinding(new BindingFake("Topic1.topic", "two"));
@ -142,7 +142,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
final WildcardConfiguration configuration = new WildcardConfiguration();
configuration.setAnyWords('>');
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null);
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null, null);
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.>"), RoutingType.MULTICAST));
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test"), RoutingType.MULTICAST));
ad.addBinding(new BindingFake("Topic1.>", "one"));
@ -171,7 +171,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
final WildcardConfiguration configuration = new WildcardConfiguration();
configuration.setAnyWords('>');
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null);
WildcardAddressManager ad = new WildcardAddressManager(new BindingFactoryFake(), configuration, null, null);
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.>"), RoutingType.MULTICAST));
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test"), RoutingType.MULTICAST));
ad.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("Topic1.test.test1"), RoutingType.MULTICAST));