diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 41edad9f3f..fe25fe1def 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -60,7 +60,6 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.config.BridgeConfiguration; @@ -154,9 +153,7 @@ import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler; import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl; -import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; @@ -165,6 +162,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; @@ -1173,7 +1171,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { } stopComponent(managementService); - unregisterMeters(); stopComponent(resourceManager); stopComponent(postOffice); @@ -2893,8 +2890,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsConfiguration(), addressSettingsRepository); } - registerMeters(); - postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getAddressQueueScanPeriod(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository); // This can't be created until node id is set @@ -3078,19 +3073,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { callActivationCompleteCallbacks(); } - private void registerMeters() { - MetricsManager metricsManager = this.metricsManager; // volatile load - if (metricsManager != null) { - metricsManager.registerBrokerGauge(builder -> { - builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION); - builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION); - builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION); - builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION); - builder.register(BrokerMetricNames.DISK_STORE_USAGE, this, metrics -> Double.valueOf(getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION); - }); - } - } - @Override public double getDiskStoreUsage() { //this should not happen but if it does, return -1 to highlight it is not working @@ -3101,13 +3083,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { return FileStoreMonitor.calculateUsage(getPagingManager().getDiskUsableSpace(), getPagingManager().getDiskTotalSpace()); } - private void unregisterMeters() { - MetricsManager metricsManager = this.metricsManager; // volatile load - if (metricsManager != null) { - metricsManager.remove(ResourceNames.BROKER + "." + configuration.getName()); - } - } - private void deploySecurityFromConfiguration() { for (Map.Entry> entry : configuration.getSecurityRoles().entrySet()) { securityRepository.addMatch(entry.getKey(), entry.getValue(), true); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index d2e7d88850..040e4a3fb6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -52,8 +52,6 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; -import org.apache.activemq.artemis.api.core.management.QueueControl; -import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.PriorityAware; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; @@ -63,8 +61,8 @@ import org.apache.activemq.artemis.core.paging.cursor.PageIterator; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; -import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.AddressQueueStatus; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -90,8 +88,6 @@ import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; -import org.apache.activemq.artemis.core.server.metrics.MetricsManager; -import org.apache.activemq.artemis.core.server.metrics.QueueMetricNames; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -689,7 +685,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.factory = factory; - registerMeters(); if (this.addressInfo != null && this.addressInfo.isPaused()) { this.pause(false); } @@ -2264,8 +2259,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { slowConsumerReaperFuture.cancel(false); } - unregisterMeters(); - tx.commit(); } catch (Exception e) { tx.rollback(); @@ -4346,43 +4339,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - private void registerMeters() { - if (server != null && server.getMetricsManager() != null) { - String addressName = address.toString(); - String queueName = name.toString(); - MetricsManager metricsManager = server.getMetricsManager(); - - metricsManager.registerQueueGauge(addressName, queueName, (builder) -> { - builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION); - builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION); - builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION); - builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION); - - builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION); - builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION); - builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION); - builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION); - - builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION); - builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION); - builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION); - builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION); - - builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION); - builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION); - builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION); - builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION); - builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION); - }); - } - } - - private void unregisterMeters() { - if (server != null && server.getMetricsManager() != null) { - server.getMetricsManager().remove(ResourceNames.QUEUE + name); - } - } - private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener { @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 207a2da2c0..6e8667743a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -47,6 +47,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.management.AcceptorControl; +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl; import org.apache.activemq.artemis.api.core.management.BridgeControl; @@ -54,6 +55,7 @@ import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl; import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; @@ -95,7 +97,9 @@ import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.core.server.metrics.AddressMetricNames; +import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; +import org.apache.activemq.artemis.core.server.metrics.QueueMetricNames; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.ResourceManager; @@ -215,15 +219,30 @@ public class ManagementServiceImpl implements ManagementService { ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName(); registerInJMX(objectName, messagingServerControl); registerInRegistry(ResourceNames.BROKER, messagingServerControl); + registerBrokerMeters(); return messagingServerControl; } + private void registerBrokerMeters() { + MetricsManager metricsManager = messagingServer.getMetricsManager(); + if (metricsManager != null) { + metricsManager.registerBrokerGauge(builder -> { + builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION); + builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION); + builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION); + builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION); + builder.register(BrokerMetricNames.DISK_STORE_USAGE, this, metrics -> Double.valueOf(messagingServer.getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION); + }); + } + } + @Override public synchronized void unregisterServer() throws Exception { ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName(); unregisterFromJMX(objectName); unregisterFromRegistry(ResourceNames.BROKER); + unregisterMeters(ResourceNames.BROKER + "." + messagingServer.getConfiguration().getName()); } @Override @@ -260,7 +279,7 @@ public class ManagementServiceImpl implements ManagementService { unregisterFromJMX(objectName); unregisterFromRegistry(ResourceNames.ADDRESS + address); - unregisterAddressMeters(address.toString()); + unregisterMeters(ResourceNames.ADDRESS + address); } public synchronized void registerQueue(final Queue queue, @@ -283,11 +302,13 @@ public class ManagementServiceImpl implements ManagementService { ObjectName objectName = objectNameBuilder.getQueueObjectName(addressInfo.getName(), queue.getName(), queue.getRoutingType()); registerInJMX(objectName, queueControl); registerInRegistry(ResourceNames.QUEUE + queue.getName(), queueControl); + registerQueueMeters(queue); if (logger.isDebugEnabled()) { logger.debug("registered queue " + objectName); } } + @Override public synchronized void registerQueue(final Queue queue, final SimpleString address, @@ -300,11 +321,47 @@ public class ManagementServiceImpl implements ManagementService { ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType); unregisterFromJMX(objectName); unregisterFromRegistry(ResourceNames.QUEUE + name); + unregisterMeters(ResourceNames.QUEUE + name); if (messageCounterManager != null) { messageCounterManager.unregisterMessageCounter(name.toString()); } } + private void registerQueueMeters(final Queue queue) { + MetricsManager metricsManager = messagingServer.getMetricsManager(); + if (metricsManager != null) { + metricsManager.registerQueueGauge(queue.getAddress().toString(), queue.getName().toString(), (builder) -> { + builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION); + builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION); + + builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION); + builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION); + + builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION); + builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION); + builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION); + + builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(queue.getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION); + builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(queue.getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION); + builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(queue.getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION); + builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(queue.getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION); + builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(queue.getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION); + }); + } + } + + private void unregisterMeters(final String name) { + MetricsManager metricsManager = messagingServer.getMetricsManager(); + if (metricsManager != null) { + metricsManager.remove(name); + } + } + @Override public synchronized void registerDivert(final Divert divert) throws Exception { ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), divert.getAddress().toString()); @@ -560,13 +617,6 @@ public class ManagementServiceImpl implements ManagementService { } } - public void unregisterAddressMeters(String address) { - MetricsManager metricsManager = messagingServer.getMetricsManager(); - if (metricsManager != null) { - metricsManager.remove(ResourceNames.ADDRESS + address); - } - } - @Override public void addNotificationListener(final NotificationListener listener) { listeners.add(listener); @@ -622,6 +672,7 @@ public class ManagementServiceImpl implements ManagementService { for (String resourceName : resourceNames) { unregisterFromRegistry(resourceName); + unregisterMeters(resourceName); } if (jmxManagementEnabled) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java index fb177bc93b..8fab39c09b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java @@ -33,12 +33,14 @@ import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics; import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.MetricsConfiguration; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.jboss.logging.Logger; public class MetricsManager { + private static final Logger log = Logger.getLogger(MetricsManager.class); + private final String brokerName; private final MeterRegistry meterRegistry; @@ -91,14 +93,7 @@ public class MetricsManager { newMeters.add(meter); }); final String resource = ResourceNames.QUEUE + queue; - this.meters.compute(resource, (s, meters) -> { - //the old meters are ignored on purpose - meters = new ArrayList<>(newMeters.size()); - for (Gauge.Builder gauge : newMeters) { - meters.add(gauge.register(meterRegistry)); - } - return meters; - }); + registerMeter(newMeters, resource); } public void registerAddressGauge(String address, Consumer builder) { @@ -116,14 +111,7 @@ public class MetricsManager { newMeters.add(meter); }); final String resource = ResourceNames.ADDRESS + address; - this.meters.compute(resource, (s, meters) -> { - //the old meters are ignored on purpose - meters = new ArrayList<>(newMeters.size()); - for (Gauge.Builder gauge : newMeters) { - meters.add(gauge.register(meterRegistry)); - } - return meters; - }); + registerMeter(newMeters, resource); } public void registerBrokerGauge(Consumer builder) { @@ -140,11 +128,19 @@ public class MetricsManager { newMeters.add(meter); }); final String resource = ResourceNames.BROKER + "." + brokerName; + registerMeter(newMeters, resource); + } + + private void registerMeter(List newMeters, String resource) { this.meters.compute(resource, (s, meters) -> { //the old meters are ignored on purpose meters = new ArrayList<>(newMeters.size()); - for (Gauge.Builder gauge : newMeters) { - meters.add(gauge.register(meterRegistry)); + for (Gauge.Builder gaugeBuilder : newMeters) { + Gauge gauge = gaugeBuilder.register(meterRegistry); + meters.add(gauge); + if (log.isDebugEnabled()) { + log.debug("Registered meter: " + gauge.getId()); + } } return meters; }); @@ -157,8 +153,8 @@ public class MetricsManager { } for (Meter meter : meters) { Meter removed = meterRegistry.remove(meter); - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("Removed meter: " + removed.getId()); + if (log.isDebugEnabled()) { + log.debug("Unregistered meter: " + removed.getId()); } } return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreMetricsLeakTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreMetricsLeakTest.java new file mode 100755 index 0000000000..43c904bda1 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreMetricsLeakTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.config.MetricsConfiguration; +import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Before; +import org.junit.Test; + +public class SharedStoreMetricsLeakTest extends ClusterTestBase { + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + setupServers(); + } + + private void setupServers() throws Exception { + setupLiveServer(0, isFileStorage(), true, isNetty(), false); + setupBackupServer(1, 0, isFileStorage(), true, isNetty()); + + getServer(0).getConfiguration().setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true)); + getServer(0).getConfiguration().setMetricsConfiguration(new MetricsConfiguration().setJvmThread(false).setJvmGc(false).setJvmMemory(false).setPlugin(new SimpleMetricsPlugin().init(null))); + getServer(1).getConfiguration().setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailoverOnServerShutdown(true).setAllowFailBack(true)); + getServer(1).getConfiguration().setMetricsConfiguration(new MetricsConfiguration().setJvmThread(false).setJvmGc(false).setJvmMemory(false).setPlugin(new SimpleMetricsPlugin().init(null))); + + // configure cluster for bother servers + setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + } + + private boolean isNetty() { + return true; + } + + @Test + public void testForMeterLeaks() throws Exception { + ActiveMQServer live = getServer(0); + ActiveMQServer backup = getServer(1); + + live.start(); + assertTrue(live.waitForActivation(5, TimeUnit.SECONDS)); + + backup.start(); + assertFalse(backup.waitForActivation(1, TimeUnit.SECONDS)); + + // there should be a handful of metrics available from the ActiveMQServerImpl itself + long baseline = backup.getMetricsManager().getMeterRegistry().getMeters().size(); + + live.stop(); + assertTrue(backup.waitForActivation(5, TimeUnit.SECONDS)); + + // after failover more meters should get registered + Wait.assertTrue(() -> backup.getMetricsManager().getMeterRegistry().getMeters().size() > baseline, 2000, 100); + + live.start(); + assertTrue(live.waitForActivation(5, TimeUnit.SECONDS)); + + // after failback the number of registered meters should return to baseline + Wait.assertTrue(() -> backup.getMetricsManager().getMeterRegistry().getMeters().size() == baseline, 2000, 100); + + live.stop(); + backup.stop(); + } +}