ARTEMIS-2834 leaking meters

Move all of the meter registration code into the ManagementServer
implementation to provide better life-cycle management.
This commit is contained in:
Justin Bertram 2020-07-01 13:19:03 -05:00 committed by Clebert Suconic
parent fd207e75b0
commit f5d5710133
5 changed files with 167 additions and 100 deletions

View File

@ -60,7 +60,6 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@ -154,9 +153,7 @@ import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager; import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl; import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
@ -165,6 +162,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
@ -1173,7 +1171,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
stopComponent(managementService); stopComponent(managementService);
unregisterMeters();
stopComponent(resourceManager); stopComponent(resourceManager);
stopComponent(postOffice); stopComponent(postOffice);
@ -2893,8 +2890,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsConfiguration(), addressSettingsRepository); metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsConfiguration(), addressSettingsRepository);
} }
registerMeters();
postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getAddressQueueScanPeriod(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository); postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getAddressQueueScanPeriod(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository);
// This can't be created until node id is set // This can't be created until node id is set
@ -3078,19 +3073,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callActivationCompleteCallbacks(); callActivationCompleteCallbacks();
} }
private void registerMeters() {
MetricsManager metricsManager = this.metricsManager; // volatile load
if (metricsManager != null) {
metricsManager.registerBrokerGauge(builder -> {
builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
builder.register(BrokerMetricNames.DISK_STORE_USAGE, this, metrics -> Double.valueOf(getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
});
}
}
@Override @Override
public double getDiskStoreUsage() { public double getDiskStoreUsage() {
//this should not happen but if it does, return -1 to highlight it is not working //this should not happen but if it does, return -1 to highlight it is not working
@ -3101,13 +3083,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return FileStoreMonitor.calculateUsage(getPagingManager().getDiskUsableSpace(), getPagingManager().getDiskTotalSpace()); return FileStoreMonitor.calculateUsage(getPagingManager().getDiskUsableSpace(), getPagingManager().getDiskTotalSpace());
} }
private void unregisterMeters() {
MetricsManager metricsManager = this.metricsManager; // volatile load
if (metricsManager != null) {
metricsManager.remove(ResourceNames.BROKER + "." + configuration.getName());
}
}
private void deploySecurityFromConfiguration() { private void deploySecurityFromConfiguration() {
for (Map.Entry<String, Set<Role>> entry : configuration.getSecurityRoles().entrySet()) { for (Map.Entry<String, Set<Role>> entry : configuration.getSecurityRoles().entrySet()) {
securityRepository.addMatch(entry.getKey(), entry.getValue(), true); securityRepository.addMatch(entry.getKey(), entry.getValue(), true);

View File

@ -52,8 +52,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.PriorityAware; import org.apache.activemq.artemis.core.PriorityAware;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
@ -63,8 +61,8 @@ import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference; import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -90,8 +88,6 @@ import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor; import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.metrics.QueueMetricNames;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener; import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -689,7 +685,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.factory = factory; this.factory = factory;
registerMeters();
if (this.addressInfo != null && this.addressInfo.isPaused()) { if (this.addressInfo != null && this.addressInfo.isPaused()) {
this.pause(false); this.pause(false);
} }
@ -2264,8 +2259,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
slowConsumerReaperFuture.cancel(false); slowConsumerReaperFuture.cancel(false);
} }
unregisterMeters();
tx.commit(); tx.commit();
} catch (Exception e) { } catch (Exception e) {
tx.rollback(); tx.rollback();
@ -4346,43 +4339,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
} }
private void registerMeters() {
if (server != null && server.getMetricsManager() != null) {
String addressName = address.toString();
String queueName = name.toString();
MetricsManager metricsManager = server.getMetricsManager();
metricsManager.registerQueueGauge(addressName, queueName, (builder) -> {
builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION);
});
}
}
private void unregisterMeters() {
if (server != null && server.getMetricsManager() != null) {
server.getMetricsManager().remove(ResourceNames.QUEUE + name);
}
}
private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener { private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener {
@Override @Override

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.management.AcceptorControl; import org.apache.activemq.artemis.api.core.management.AcceptorControl;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl; import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl;
import org.apache.activemq.artemis.api.core.management.BridgeControl; import org.apache.activemq.artemis.api.core.management.BridgeControl;
@ -54,6 +55,7 @@ import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
@ -95,7 +97,9 @@ import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.core.server.metrics.AddressMetricNames; import org.apache.activemq.artemis.core.server.metrics.AddressMetricNames;
import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.metrics.QueueMetricNames;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.ResourceManager;
@ -215,15 +219,30 @@ public class ManagementServiceImpl implements ManagementService {
ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName(); ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName();
registerInJMX(objectName, messagingServerControl); registerInJMX(objectName, messagingServerControl);
registerInRegistry(ResourceNames.BROKER, messagingServerControl); registerInRegistry(ResourceNames.BROKER, messagingServerControl);
registerBrokerMeters();
return messagingServerControl; return messagingServerControl;
} }
private void registerBrokerMeters() {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerBrokerGauge(builder -> {
builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
builder.register(BrokerMetricNames.DISK_STORE_USAGE, this, metrics -> Double.valueOf(messagingServer.getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
});
}
}
@Override @Override
public synchronized void unregisterServer() throws Exception { public synchronized void unregisterServer() throws Exception {
ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName(); ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName();
unregisterFromJMX(objectName); unregisterFromJMX(objectName);
unregisterFromRegistry(ResourceNames.BROKER); unregisterFromRegistry(ResourceNames.BROKER);
unregisterMeters(ResourceNames.BROKER + "." + messagingServer.getConfiguration().getName());
} }
@Override @Override
@ -260,7 +279,7 @@ public class ManagementServiceImpl implements ManagementService {
unregisterFromJMX(objectName); unregisterFromJMX(objectName);
unregisterFromRegistry(ResourceNames.ADDRESS + address); unregisterFromRegistry(ResourceNames.ADDRESS + address);
unregisterAddressMeters(address.toString()); unregisterMeters(ResourceNames.ADDRESS + address);
} }
public synchronized void registerQueue(final Queue queue, public synchronized void registerQueue(final Queue queue,
@ -283,11 +302,13 @@ public class ManagementServiceImpl implements ManagementService {
ObjectName objectName = objectNameBuilder.getQueueObjectName(addressInfo.getName(), queue.getName(), queue.getRoutingType()); ObjectName objectName = objectNameBuilder.getQueueObjectName(addressInfo.getName(), queue.getName(), queue.getRoutingType());
registerInJMX(objectName, queueControl); registerInJMX(objectName, queueControl);
registerInRegistry(ResourceNames.QUEUE + queue.getName(), queueControl); registerInRegistry(ResourceNames.QUEUE + queue.getName(), queueControl);
registerQueueMeters(queue);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("registered queue " + objectName); logger.debug("registered queue " + objectName);
} }
} }
@Override @Override
public synchronized void registerQueue(final Queue queue, public synchronized void registerQueue(final Queue queue,
final SimpleString address, final SimpleString address,
@ -300,11 +321,47 @@ public class ManagementServiceImpl implements ManagementService {
ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType); ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType);
unregisterFromJMX(objectName); unregisterFromJMX(objectName);
unregisterFromRegistry(ResourceNames.QUEUE + name); unregisterFromRegistry(ResourceNames.QUEUE + name);
unregisterMeters(ResourceNames.QUEUE + name);
if (messageCounterManager != null) { if (messageCounterManager != null) {
messageCounterManager.unregisterMessageCounter(name.toString()); messageCounterManager.unregisterMessageCounter(name.toString());
} }
} }
private void registerQueueMeters(final Queue queue) {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerQueueGauge(queue.getAddress().toString(), queue.getName().toString(), (builder) -> {
builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(queue.getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(queue.getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(queue.getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION);
builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(queue.getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(queue.getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION);
});
}
}
private void unregisterMeters(final String name) {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.remove(name);
}
}
@Override @Override
public synchronized void registerDivert(final Divert divert) throws Exception { public synchronized void registerDivert(final Divert divert) throws Exception {
ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), divert.getAddress().toString()); ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), divert.getAddress().toString());
@ -560,13 +617,6 @@ public class ManagementServiceImpl implements ManagementService {
} }
} }
public void unregisterAddressMeters(String address) {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.remove(ResourceNames.ADDRESS + address);
}
}
@Override @Override
public void addNotificationListener(final NotificationListener listener) { public void addNotificationListener(final NotificationListener listener) {
listeners.add(listener); listeners.add(listener);
@ -622,6 +672,7 @@ public class ManagementServiceImpl implements ManagementService {
for (String resourceName : resourceNames) { for (String resourceName : resourceNames) {
unregisterFromRegistry(resourceName); unregisterFromRegistry(resourceName);
unregisterMeters(resourceName);
} }
if (jmxManagementEnabled) { if (jmxManagementEnabled) {

View File

@ -33,12 +33,14 @@ import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics; import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.MetricsConfiguration; import org.apache.activemq.artemis.core.config.MetricsConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.jboss.logging.Logger;
public class MetricsManager { public class MetricsManager {
private static final Logger log = Logger.getLogger(MetricsManager.class);
private final String brokerName; private final String brokerName;
private final MeterRegistry meterRegistry; private final MeterRegistry meterRegistry;
@ -91,14 +93,7 @@ public class MetricsManager {
newMeters.add(meter); newMeters.add(meter);
}); });
final String resource = ResourceNames.QUEUE + queue; final String resource = ResourceNames.QUEUE + queue;
this.meters.compute(resource, (s, meters) -> { registerMeter(newMeters, resource);
//the old meters are ignored on purpose
meters = new ArrayList<>(newMeters.size());
for (Gauge.Builder gauge : newMeters) {
meters.add(gauge.register(meterRegistry));
}
return meters;
});
} }
public void registerAddressGauge(String address, Consumer<MetricGaugeBuilder> builder) { public void registerAddressGauge(String address, Consumer<MetricGaugeBuilder> builder) {
@ -116,14 +111,7 @@ public class MetricsManager {
newMeters.add(meter); newMeters.add(meter);
}); });
final String resource = ResourceNames.ADDRESS + address; final String resource = ResourceNames.ADDRESS + address;
this.meters.compute(resource, (s, meters) -> { registerMeter(newMeters, resource);
//the old meters are ignored on purpose
meters = new ArrayList<>(newMeters.size());
for (Gauge.Builder gauge : newMeters) {
meters.add(gauge.register(meterRegistry));
}
return meters;
});
} }
public void registerBrokerGauge(Consumer<MetricGaugeBuilder> builder) { public void registerBrokerGauge(Consumer<MetricGaugeBuilder> builder) {
@ -140,11 +128,19 @@ public class MetricsManager {
newMeters.add(meter); newMeters.add(meter);
}); });
final String resource = ResourceNames.BROKER + "." + brokerName; final String resource = ResourceNames.BROKER + "." + brokerName;
registerMeter(newMeters, resource);
}
private void registerMeter(List<Gauge.Builder> newMeters, String resource) {
this.meters.compute(resource, (s, meters) -> { this.meters.compute(resource, (s, meters) -> {
//the old meters are ignored on purpose //the old meters are ignored on purpose
meters = new ArrayList<>(newMeters.size()); meters = new ArrayList<>(newMeters.size());
for (Gauge.Builder gauge : newMeters) { for (Gauge.Builder gaugeBuilder : newMeters) {
meters.add(gauge.register(meterRegistry)); Gauge gauge = gaugeBuilder.register(meterRegistry);
meters.add(gauge);
if (log.isDebugEnabled()) {
log.debug("Registered meter: " + gauge.getId());
}
} }
return meters; return meters;
}); });
@ -157,8 +153,8 @@ public class MetricsManager {
} }
for (Meter meter : meters) { for (Meter meter : meters) {
Meter removed = meterRegistry.remove(meter); Meter removed = meterRegistry.remove(meter);
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { if (log.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("Removed meter: " + removed.getId()); log.debug("Unregistered meter: " + removed.getId());
} }
} }
return null; return null;

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
public class SharedStoreMetricsLeakTest extends ClusterTestBase {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
setupServers();
}
private void setupServers() throws Exception {
setupLiveServer(0, isFileStorage(), true, isNetty(), false);
setupBackupServer(1, 0, isFileStorage(), true, isNetty());
getServer(0).getConfiguration().setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true));
getServer(0).getConfiguration().setMetricsConfiguration(new MetricsConfiguration().setJvmThread(false).setJvmGc(false).setJvmMemory(false).setPlugin(new SimpleMetricsPlugin().init(null)));
getServer(1).getConfiguration().setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailoverOnServerShutdown(true).setAllowFailBack(true));
getServer(1).getConfiguration().setMetricsConfiguration(new MetricsConfiguration().setJvmThread(false).setJvmGc(false).setJvmMemory(false).setPlugin(new SimpleMetricsPlugin().init(null)));
// configure cluster for bother servers
setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
}
private boolean isNetty() {
return true;
}
@Test
public void testForMeterLeaks() throws Exception {
ActiveMQServer live = getServer(0);
ActiveMQServer backup = getServer(1);
live.start();
assertTrue(live.waitForActivation(5, TimeUnit.SECONDS));
backup.start();
assertFalse(backup.waitForActivation(1, TimeUnit.SECONDS));
// there should be a handful of metrics available from the ActiveMQServerImpl itself
long baseline = backup.getMetricsManager().getMeterRegistry().getMeters().size();
live.stop();
assertTrue(backup.waitForActivation(5, TimeUnit.SECONDS));
// after failover more meters should get registered
Wait.assertTrue(() -> backup.getMetricsManager().getMeterRegistry().getMeters().size() > baseline, 2000, 100);
live.start();
assertTrue(live.waitForActivation(5, TimeUnit.SECONDS));
// after failback the number of registered meters should return to baseline
Wait.assertTrue(() -> backup.getMetricsManager().getMeterRegistry().getMeters().size() == baseline, 2000, 100);
live.stop();
backup.stop();
}
}