ARTEMIS-4306 add authn/z cache metrics

This commit includes the following changes:

 - Management operations to get sucess & failure counts for authn and
   authz along with the corresponding audit logging.
 - Export the aforementioned authn & authz metrics.
 - Export metrics for the underlying authn & authz caches including the
   ability to enable/disable them.
 - Update metrics tests to validate tags in addition to keys and values.
 - Update documentation to explain new functionality and clarify
   existing metric tags.
This commit is contained in:
Justin Bertram 2023-06-08 15:49:18 -05:00 committed by clebertsuconic
parent 58fc7cbea1
commit acc64b184e
24 changed files with 502 additions and 128 deletions

View File

@ -2716,4 +2716,32 @@ public interface AuditLogger {
@LogMessage(id = 601777, value = "User {} is getting broker plugin class names on target resource: {}", level = LogMessage.Level.INFO)
void getBrokerPluginClassNames(String user, Object source);
static void getAuthenticationSuccessCount(Object source) {
BASE_LOGGER.getAuthenticationSuccessCount(getCaller(), source);
}
@LogMessage(id = 601778, value = "User {} is getting authentication success count on target resource: {}", level = LogMessage.Level.INFO)
void getAuthenticationSuccessCount(String user, Object source);
static void getAuthenticationFailureCount(Object source) {
BASE_LOGGER.getAuthenticationFailureCount(getCaller(), source);
}
@LogMessage(id = 601779, value = "User {} is getting authentication failure count on target resource: {}", level = LogMessage.Level.INFO)
void getAuthenticationFailureCount(String user, Object source);
static void getAuthorizationSuccessCount(Object source) {
BASE_LOGGER.getAuthorizationSuccessCount(getCaller(), source);
}
@LogMessage(id = 601780, value = "User {} is getting authorization success count on target resource: {}", level = LogMessage.Level.INFO)
void getAuthorizationSuccessCount(String user, Object source);
static void getAuthorizationFailureCount(Object source) {
BASE_LOGGER.getAuthorizationFailureCount(getCaller(), source);
}
@LogMessage(id = 601781, value = "User {} is getting authorization failure count on target resource: {}", level = LogMessage.Level.INFO)
void getAuthorizationFailureCount(String user, Object source);
}

View File

@ -679,6 +679,9 @@ public final class ActiveMQDefaultConfiguration {
// Whether or not to report logging metrics
private static final boolean DEFAULT_LOGGING_METRICS = false;
// Whether or not to report security cache metrics
private static final boolean DEFAULT_SECURITY_CACHE_METRICS = false;
// How often (in ms) to scan for expired MQTT sessions
private static long DEFAULT_MQTT_SESSION_SCAN_INTERVAL = 500;
@ -1892,6 +1895,13 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_LOGGING_METRICS;
}
/**
* Whether or not to report security cache metrics
*/
public static Boolean getDefaultSecurityCacheMetrics() {
return DEFAULT_SECURITY_CACHE_METRICS;
}
/**
* How often (in ms) to scan for expired MQTT sessions
*/

View File

@ -33,6 +33,10 @@ public interface ActiveMQServerControl {
String DISK_STORE_USAGE_DESCRIPTION = "Fraction of total disk store used";
String REPLICA_SYNC_DESCRIPTION = "If the initial replication synchronization process is complete";
String IS_ACTIVE_DESCRIPTION = "If the server is active";
String AUTHENTICATION_SUCCESS_COUNT = "Number of successful authentication attempts";
String AUTHENTICATION_FAILURE_COUNT = "Number of failed authentication attempts";
String AUTHORIZATION_SUCCESS_COUNT = "Number of successful authorization attempts";
String AUTHORIZATION_FAILURE_COUNT = "Number of failed authorization attempts";
/**
* Returns this server's name.
@ -2072,5 +2076,17 @@ public interface ActiveMQServerControl {
@Operation(desc = "Clear the authorization cache", impact = MBeanOperationInfo.ACTION)
void clearAuthorizationCache() throws Exception;
@Attribute(desc = AUTHENTICATION_SUCCESS_COUNT)
long getAuthenticationSuccessCount();
@Attribute(desc = AUTHENTICATION_FAILURE_COUNT)
long getAuthenticationFailureCount();
@Attribute(desc = AUTHORIZATION_SUCCESS_COUNT)
long getAuthorizationSuccessCount();
@Attribute(desc = AUTHORIZATION_FAILURE_COUNT)
long getAuthorizationFailureCount();
}

View File

@ -31,6 +31,7 @@ public class MetricsConfiguration implements Serializable {
private boolean processor = ActiveMQDefaultConfiguration.getDefaultProcessorMetrics();
private boolean uptime = ActiveMQDefaultConfiguration.getDefaultUptimeMetrics();
private boolean logging = ActiveMQDefaultConfiguration.getDefaultLoggingMetrics();
private boolean securityCaches = ActiveMQDefaultConfiguration.getDefaultSecurityCacheMetrics();
private ActiveMQMetricsPlugin plugin;
public boolean isJvmMemory() {
@ -113,4 +114,13 @@ public class MetricsConfiguration implements Serializable {
this.plugin = plugin;
return this;
}
public boolean isSecurityCaches() {
return securityCaches;
}
public MetricsConfiguration setSecurityCaches(boolean securityCaches) {
this.securityCaches = securityCaches;
return this;
}
}

View File

@ -1040,6 +1040,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
metricsConfiguration.setUptime(XMLUtil.parseBoolean(child));
} else if (child.getNodeName().equals("logging")) {
metricsConfiguration.setLogging(XMLUtil.parseBoolean(child));
} else if (child.getNodeName().equals("security-caches")) {
metricsConfiguration.setSecurityCaches(XMLUtil.parseBoolean(child));
} else if (child.getNodeName().equals("plugin")) {
metricsConfiguration.setPlugin(parseMetricsPlugin(child, config));
}

View File

@ -4691,6 +4691,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
((SecurityStoreImpl)server.getSecurityStore()).invalidateAuthorizationCache();
}
@Override
public long getAuthenticationSuccessCount() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getAuthenticationSuccessCount(this.server);
}
return server.getSecurityStore().getAuthenticationSuccessCount();
}
@Override
public long getAuthenticationFailureCount() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getAuthenticationFailureCount(this.server);
}
return server.getSecurityStore().getAuthenticationFailureCount();
}
@Override
public long getAuthorizationSuccessCount() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getAuthorizationSuccessCount(this.server);
}
return server.getSecurityStore().getAuthorizationSuccessCount();
}
@Override
public long getAuthorizationFailureCount() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getAuthorizationFailureCount(this.server);
}
return server.getSecurityStore().getAuthorizationFailureCount();
}
public ActiveMQServer getServer() {
return server;
}

View File

@ -37,4 +37,12 @@ public interface SecurityStore {
void stop();
Subject getSessionSubject(SecurityAuth session);
long getAuthenticationSuccessCount();
long getAuthenticationFailureCount();
long getAuthorizationSuccessCount();
long getAuthorizationFailureCount();
}

View File

@ -17,11 +17,15 @@
package org.apache.activemq.artemis.core.security.impl;
import javax.security.auth.Subject;
import java.lang.invoke.MethodHandles;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
@ -52,11 +56,6 @@ import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.lang.invoke.MethodHandles;
/**
* The ActiveMQ Artemis SecurityStore implementation
*/
@ -80,6 +79,15 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
private final NotificationService notificationService;
private static final AtomicLongFieldUpdater<SecurityStoreImpl> AUTHENTICATION_SUCCESS_COUNT_UPDATER = AtomicLongFieldUpdater.newUpdater(SecurityStoreImpl.class, "authenticationSuccessCount");
private volatile long authenticationSuccessCount;
private static final AtomicLongFieldUpdater<SecurityStoreImpl> AUTHENTICATION_FAILURE_COUNT_UPDATER = AtomicLongFieldUpdater.newUpdater(SecurityStoreImpl.class, "authenticationFailureCount");
private volatile long authenticationFailureCount;
private static final AtomicLongFieldUpdater<SecurityStoreImpl> AUTHORIZATION_SUCCESS_COUNT_UPDATER = AtomicLongFieldUpdater.newUpdater(SecurityStoreImpl.class, "authorizationSuccessCount");
private volatile long authorizationSuccessCount;
private static final AtomicLongFieldUpdater<SecurityStoreImpl> AUTHORIZATION_FAILURE_COUNT_UPDATER = AtomicLongFieldUpdater.newUpdater(SecurityStoreImpl.class, "authorizationFailureCount");
private volatile long authorizationFailureCount;
/**
* @param notificationService can be <code>null</code>
@ -99,23 +107,30 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
this.managementClusterUser = managementClusterUser;
this.managementClusterPassword = managementClusterPassword;
this.notificationService = notificationService;
if (authenticationCacheSize == 0) {
if (securityEnabled) {
if (authenticationCacheSize == 0) {
authenticationCache = null;
} else {
authenticationCache = Caffeine.newBuilder()
.maximumSize(authenticationCacheSize)
.expireAfterWrite(invalidationInterval, TimeUnit.MILLISECONDS)
.recordStats()
.build();
}
if (authorizationCacheSize == 0) {
authorizationCache = null;
} else {
authorizationCache = Caffeine.newBuilder()
.maximumSize(authorizationCacheSize)
.expireAfterWrite(invalidationInterval, TimeUnit.MILLISECONDS)
.recordStats()
.build();
}
this.securityRepository.registerListener(this);
} else {
authenticationCache = null;
} else {
authenticationCache = Caffeine.newBuilder()
.maximumSize(authenticationCacheSize)
.expireAfterWrite(invalidationInterval, TimeUnit.MILLISECONDS)
.build();
}
if (authorizationCacheSize == 0) {
authorizationCache = null;
} else {
authorizationCache = Caffeine.newBuilder()
.maximumSize(authorizationCacheSize)
.expireAfterWrite(invalidationInterval, TimeUnit.MILLISECONDS)
.build();
}
this.securityRepository.registerListener(this);
}
// SecurityManager implementation --------------------------------
@ -157,8 +172,10 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
* operation between nodes
*/
if (!managementClusterPassword.equals(password)) {
AUTHENTICATION_FAILURE_COUNT_UPDATER.incrementAndGet(this);
throw ActiveMQMessageBundle.BUNDLE.unableToValidateClusterUser(user);
} else {
AUTHENTICATION_SUCCESS_COUNT_UPDATER.incrementAndGet(this);
return managementClusterUser;
}
}
@ -227,6 +244,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
}
}
AUTHENTICATION_SUCCESS_COUNT_UPDATER.incrementAndGet(this);
return validatedUser;
}
@ -254,6 +272,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
// bypass permission checks for management cluster user
String user = session.getUsername();
if (managementClusterUser.equals(user) && session.getPassword().equals(managementClusterPassword)) {
AUTHORIZATION_SUCCESS_COUNT_UPDATER.incrementAndGet(this);
return;
}
@ -271,6 +290,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
}
if (checkAuthorizationCache(fqqn != null ? fqqn : bareAddress, user, checkType)) {
AUTHORIZATION_SUCCESS_COUNT_UPDATER.incrementAndGet(this);
return;
}
@ -320,11 +340,14 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
ex = ActiveMQMessageBundle.BUNDLE.userNoPermissionsQueue(getCaller(user, session.getRemotingConnection().getSubject()), checkType, bareQueue, bareAddress);
}
AuditLogger.securityFailure(session.getRemotingConnection().getSubject(), session.getRemotingConnection().getRemoteAddress(), ex.getMessage(), ex);
AUTHORIZATION_FAILURE_COUNT_UPDATER.incrementAndGet(this);
throw ex;
}
// if we get here we're granted, add to the cache
AUTHORIZATION_SUCCESS_COUNT_UPDATER.incrementAndGet(this);
if (user == null) {
// should get all user/pass into a subject and only cache subjects
// till then when subject is in play, the user may be null and
@ -400,6 +423,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
AuditLogger.userFailedAuthenticationInAudit(null, e.getMessage(), connection == null ? "null" : connection.getID().toString());
}
AUTHENTICATION_FAILURE_COUNT_UPDATER.incrementAndGet(this);
throw e;
}
@ -521,13 +545,31 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
return user + "." + checkType.name();
}
// for testing
protected Cache<String, Pair<Boolean, Subject>> getAuthenticationCache() {
public Cache<String, Pair<Boolean, Subject>> getAuthenticationCache() {
return authenticationCache;
}
// for testing
protected Cache<String, ConcurrentHashSet<SimpleString>> getAuthorizationCache() {
public Cache<String, ConcurrentHashSet<SimpleString>> getAuthorizationCache() {
return authorizationCache;
}
@Override
public long getAuthenticationSuccessCount() {
return authenticationSuccessCount;
}
@Override
public long getAuthenticationFailureCount() {
return authenticationFailureCount;
}
@Override
public long getAuthorizationSuccessCount() {
return authorizationSuccessCount;
}
@Override
public long getAuthorizationFailureCount() {
return authorizationFailureCount;
}
}

View File

@ -3313,7 +3313,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* are not required to be included in the OSGi bundle and the Micrometer jars apparently don't support OSGi.
*/
if (configuration.getMetricsConfiguration() != null && configuration.getMetricsConfiguration().getPlugin() != null) {
metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsConfiguration(), addressSettingsRepository);
metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsConfiguration(), addressSettingsRepository, securityStore);
}
postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getAddressQueueScanPeriod(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository);

View File

@ -25,7 +25,9 @@ import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -34,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.regex.Pattern;
import io.micrometer.core.instrument.Tag;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
@ -236,13 +239,17 @@ public class ManagementServiceImpl implements ManagementService {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerBrokerGauge(builder -> {
builder.build(BrokerMetricNames.CONNECTION_COUNT, messagingServer, metrics -> (double) messagingServer.getConnectionCount(), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
builder.build(BrokerMetricNames.TOTAL_CONNECTION_COUNT, messagingServer, metrics -> (double) messagingServer.getTotalConnectionCount(), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
builder.build(BrokerMetricNames.ADDRESS_MEMORY_USAGE, messagingServer, metrics -> (double) messagingServerControl.getAddressMemoryUsage(), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
builder.build(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, messagingServer, metrics -> (double) messagingServerControl.getAddressMemoryUsagePercentage(), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
builder.build(BrokerMetricNames.DISK_STORE_USAGE, messagingServer, metrics -> messagingServer.getDiskStoreUsage(), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
builder.build(BrokerMetricNames.REPLICA_SYNC, messagingServer, metrics -> messagingServer.isReplicaSync() ? 1D : 0D, ActiveMQServerControl.REPLICA_SYNC_DESCRIPTION);
builder.build(BrokerMetricNames.ACTIVE, messagingServer, metrics -> messagingServer.isActive() ? 1D : 0D, ActiveMQServerControl.IS_ACTIVE_DESCRIPTION);
builder.build(BrokerMetricNames.CONNECTION_COUNT, messagingServer, metrics -> (double) messagingServer.getConnectionCount(), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(BrokerMetricNames.TOTAL_CONNECTION_COUNT, messagingServer, metrics -> (double) messagingServer.getTotalConnectionCount(), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(BrokerMetricNames.ADDRESS_MEMORY_USAGE, messagingServer, metrics -> (double) messagingServerControl.getAddressMemoryUsage(), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION, Collections.emptyList());
builder.build(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, messagingServer, metrics -> (double) messagingServerControl.getAddressMemoryUsagePercentage(), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION, Collections.emptyList());
builder.build(BrokerMetricNames.DISK_STORE_USAGE, messagingServer, metrics -> messagingServer.getDiskStoreUsage(), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION, Collections.emptyList());
builder.build(BrokerMetricNames.REPLICA_SYNC, messagingServer, metrics -> messagingServer.isReplicaSync() ? 1D : 0D, ActiveMQServerControl.REPLICA_SYNC_DESCRIPTION, Collections.emptyList());
builder.build(BrokerMetricNames.ACTIVE, messagingServer, metrics -> messagingServer.isActive() ? 1D : 0D, ActiveMQServerControl.IS_ACTIVE_DESCRIPTION, Collections.emptyList());
builder.build(BrokerMetricNames.AUTHENTICATION_COUNT, securityStore, metrics -> (double) securityStore.getAuthenticationSuccessCount(), ActiveMQServerControl.AUTHENTICATION_SUCCESS_COUNT, Arrays.asList(Tag.of("result", "success")));
builder.build(BrokerMetricNames.AUTHENTICATION_COUNT, securityStore, metrics -> (double) securityStore.getAuthenticationFailureCount(), ActiveMQServerControl.AUTHENTICATION_FAILURE_COUNT, Arrays.asList(Tag.of("result", "failure")));
builder.build(BrokerMetricNames.AUTHORIZATION_COUNT, securityStore, metrics -> (double) securityStore.getAuthorizationSuccessCount(), ActiveMQServerControl.AUTHORIZATION_SUCCESS_COUNT, Arrays.asList(Tag.of("result", "success")));
builder.build(BrokerMetricNames.AUTHORIZATION_COUNT, securityStore, metrics -> (double) securityStore.getAuthorizationFailureCount(), ActiveMQServerControl.AUTHORIZATION_FAILURE_COUNT, Arrays.asList(Tag.of("result", "failure")));
});
}
}
@ -277,10 +284,10 @@ public class ManagementServiceImpl implements ManagementService {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerAddressGauge(addressInfo.getName().toString(), builder -> {
builder.build(AddressMetricNames.ROUTED_MESSAGE_COUNT, addressInfo, metrics -> (double) addressInfo.getRoutedMessageCount(), AddressControl.ROUTED_MESSAGE_COUNT_DESCRIPTION);
builder.build(AddressMetricNames.UNROUTED_MESSAGE_COUNT, addressInfo, metrics -> (double) addressInfo.getUnRoutedMessageCount(), AddressControl.UNROUTED_MESSAGE_COUNT_DESCRIPTION);
builder.build(AddressMetricNames.ADDRESS_SIZE, addressInfo, metrics -> (double) addressControl.getAddressSize(), AddressControl.ADDRESS_SIZE_DESCRIPTION);
builder.build(AddressMetricNames.PAGES_COUNT, addressInfo, metrics -> (double) addressControl.getNumberOfPages(), AddressControl.NUMBER_OF_PAGES_DESCRIPTION);
builder.build(AddressMetricNames.ROUTED_MESSAGE_COUNT, addressInfo, metrics -> (double) addressInfo.getRoutedMessageCount(), AddressControl.ROUTED_MESSAGE_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(AddressMetricNames.UNROUTED_MESSAGE_COUNT, addressInfo, metrics -> (double) addressInfo.getUnRoutedMessageCount(), AddressControl.UNROUTED_MESSAGE_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(AddressMetricNames.ADDRESS_SIZE, addressInfo, metrics -> (double) addressControl.getAddressSize(), AddressControl.ADDRESS_SIZE_DESCRIPTION, Collections.emptyList());
builder.build(AddressMetricNames.PAGES_COUNT, addressInfo, metrics -> (double) addressControl.getNumberOfPages(), AddressControl.NUMBER_OF_PAGES_DESCRIPTION, Collections.emptyList());
});
}
}
@ -336,26 +343,26 @@ public class ManagementServiceImpl implements ManagementService {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerQueueGauge(queue.getAddress().toString(), queue.getName().toString(), (builder) -> {
builder.build(QueueMetricNames.MESSAGE_COUNT, queue, metrics -> (double) queue.getMessageCount(), QueueControl.MESSAGE_COUNT_DESCRIPTION);
builder.build(QueueMetricNames.DURABLE_MESSAGE_COUNT, queue, metrics -> (double) queue.getDurableMessageCount(), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
builder.build(QueueMetricNames.PERSISTENT_SIZE, queue, metrics -> (double) queue.getPersistentSize(), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
builder.build(QueueMetricNames.DURABLE_PERSISTENT_SIZE, queue, metrics -> (double) queue.getDurablePersistentSize(), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
builder.build(QueueMetricNames.MESSAGE_COUNT, queue, metrics -> (double) queue.getMessageCount(), QueueControl.MESSAGE_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.DURABLE_MESSAGE_COUNT, queue, metrics -> (double) queue.getDurableMessageCount(), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.PERSISTENT_SIZE, queue, metrics -> (double) queue.getPersistentSize(), QueueControl.PERSISTENT_SIZE_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.DURABLE_PERSISTENT_SIZE, queue, metrics -> (double) queue.getDurablePersistentSize(), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.DELIVERING_MESSAGE_COUNT, queue, metrics -> (double) queue.getDeliveringCount(), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.build(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, queue, metrics -> (double) queue.getDurableDeliveringCount(), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
builder.build(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, queue, metrics -> (double) queue.getDeliveringSize(), QueueControl.DELIVERING_SIZE_DESCRIPTION);
builder.build(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, queue, metrics -> (double) queue.getDurableDeliveringSize(), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
builder.build(QueueMetricNames.DELIVERING_MESSAGE_COUNT, queue, metrics -> (double) queue.getDeliveringCount(), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, queue, metrics -> (double) queue.getDurableDeliveringCount(), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, queue, metrics -> (double) queue.getDeliveringSize(), QueueControl.DELIVERING_SIZE_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, queue, metrics -> (double) queue.getDurableDeliveringSize(), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, queue, metrics -> (double) queue.getScheduledCount(), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
builder.build(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, queue, metrics -> (double) queue.getDurableScheduledCount(), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
builder.build(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, queue, metrics -> (double) queue.getScheduledSize(), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
builder.build(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, queue, metrics -> (double) queue.getDurableScheduledSize(), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
builder.build(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, queue, metrics -> (double) queue.getScheduledCount(), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, queue, metrics -> (double) queue.getDurableScheduledCount(), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, queue, metrics -> (double) queue.getScheduledSize(), QueueControl.SCHEDULED_SIZE_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, queue, metrics -> (double) queue.getDurableScheduledSize(), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.MESSAGES_ACKNOWLEDGED, queue, metrics -> (double) queue.getMessagesAcknowledged(), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
builder.build(QueueMetricNames.MESSAGES_ADDED, queue, metrics -> (double) queue.getMessagesAdded(), QueueControl.MESSAGES_ADDED_DESCRIPTION);
builder.build(QueueMetricNames.MESSAGES_KILLED, queue, metrics -> (double) queue.getMessagesKilled(), QueueControl.MESSAGES_KILLED_DESCRIPTION);
builder.build(QueueMetricNames.MESSAGES_EXPIRED, queue, metrics -> (double) queue.getMessagesExpired(), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
builder.build(QueueMetricNames.CONSUMER_COUNT, queue, metrics -> (double) queue.getConsumerCount(), QueueControl.CONSUMER_COUNT_DESCRIPTION);
builder.build(QueueMetricNames.MESSAGES_ACKNOWLEDGED, queue, metrics -> (double) queue.getMessagesAcknowledged(), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.MESSAGES_ADDED, queue, metrics -> (double) queue.getMessagesAdded(), QueueControl.MESSAGES_ADDED_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.MESSAGES_KILLED, queue, metrics -> (double) queue.getMessagesKilled(), QueueControl.MESSAGES_KILLED_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.MESSAGES_EXPIRED, queue, metrics -> (double) queue.getMessagesExpired(), QueueControl.MESSAGES_EXPIRED_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.CONSUMER_COUNT, queue, metrics -> (double) queue.getConsumerCount(), QueueControl.CONSUMER_COUNT_DESCRIPTION, Collections.emptyList());
});
}
}

View File

@ -25,4 +25,6 @@ public class BrokerMetricNames {
public static final String DISK_STORE_USAGE = "disk.store.usage";
public static final String REPLICA_SYNC = "replica.sync";
public static final String ACTIVE = "active";
public static final String AUTHENTICATION_COUNT = "authentication.count";
public static final String AUTHORIZATION_COUNT = "authorization.count";
}

View File

@ -26,9 +26,12 @@ import java.util.function.Consumer;
import java.util.function.ToDoubleFunction;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Gauge.Builder;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
@ -39,6 +42,8 @@ import io.micrometer.core.instrument.binder.system.UptimeMetrics;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.security.impl.SecurityStoreImpl;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -59,7 +64,8 @@ public class MetricsManager {
public MetricsManager(String brokerName,
MetricsConfiguration metricsConfiguration,
HierarchicalRepository<AddressSettings> addressSettingsRepository) {
HierarchicalRepository<AddressSettings> addressSettingsRepository,
SecurityStore securityStore) {
this.brokerName = brokerName;
this.meterRegistry = metricsConfiguration.getPlugin().getRegistry();
this.addressSettingsRepository = addressSettingsRepository;
@ -90,6 +96,10 @@ public class MetricsManager {
if (metricsConfiguration.isLogging()) {
new Log4j2Metrics().bindTo(meterRegistry);
}
if (metricsConfiguration.isSecurityCaches() && securityStore.isSecurityEnabled()) {
CaffeineCacheMetrics.monitor(meterRegistry, ((SecurityStoreImpl)securityStore).getAuthenticationCache(), "authentication");
CaffeineCacheMetrics.monitor(meterRegistry, ((SecurityStoreImpl)securityStore).getAuthorizationCache(), "authorization");
}
}
}
@ -100,19 +110,20 @@ public class MetricsManager {
@FunctionalInterface
public interface MetricGaugeBuilder {
void build(String metricName, Object state, ToDoubleFunction f, String description);
void build(String metricName, Object state, ToDoubleFunction<Object> f, String description, List<Tag> tags);
}
public void registerQueueGauge(String address, String queue, Consumer<MetricGaugeBuilder> builder) {
if (this.meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
return;
}
final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
final List<Builder<Object>> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description, tags) -> {
Builder<Object> meter = Gauge
.builder("artemis." + metricName, state, f)
.tag("address", address)
.tag("queue", queue)
.tags(tags)
.description(description);
gaugeBuilders.add(meter);
});
@ -123,11 +134,12 @@ public class MetricsManager {
if (this.meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
return;
}
final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
final List<Builder<Object>> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description, tags) -> {
Builder<Object> meter = Gauge
.builder("artemis." + metricName, state, f)
.tag("address", address)
.tags(tags)
.description(description);
gaugeBuilders.add(meter);
});
@ -138,23 +150,24 @@ public class MetricsManager {
if (this.meterRegistry == null) {
return;
}
final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
final List<Builder<Object>> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description, tags) -> {
Builder<Object> meter = Gauge
.builder("artemis." + metricName, state, f)
.tags(tags)
.description(description);
gaugeBuilders.add(meter);
});
registerMeters(gaugeBuilders, ResourceNames.BROKER + "." + brokerName);
}
private void registerMeters(List<Gauge.Builder> gaugeBuilders, String resource) {
private void registerMeters(List<Builder<Object>> gaugeBuilders, String resource) {
if (meters.get(resource) != null) {
throw ActiveMQMessageBundle.BUNDLE.metersAlreadyRegistered(resource);
}
logger.debug("Registering meters for {}", resource);
List<Meter> newMeters = new ArrayList<>(gaugeBuilders.size());
for (Gauge.Builder gaugeBuilder : gaugeBuilders) {
for (Builder<Object> gaugeBuilder : gaugeBuilders) {
Gauge gauge = gaugeBuilder.register(meterRegistry);
newMeters.add(gauge);
logger.debug("Registered meter: {}", gauge.getId());

View File

@ -4913,6 +4913,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="security-caches" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
whether to report metrics for the authentication and authorization caches
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="plugin" maxOccurs="1" minOccurs="0">
<xsd:complexType>
<xsd:annotation>

View File

@ -157,5 +157,7 @@ public class DefaultsFileConfigurationTest extends AbstractConfigurationTestBase
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultUptimeMetrics(), conf.getMetricsConfiguration().isUptime());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultLoggingMetrics(), conf.getMetricsConfiguration().isLogging());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultSecurityCacheMetrics(), conf.getMetricsConfiguration().isSecurityCaches());
}
}

View File

@ -643,6 +643,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase {
assertTrue(metricsConfiguration.isProcessor());
assertTrue(metricsConfiguration.isUptime());
assertTrue(metricsConfiguration.isLogging());
assertTrue(metricsConfiguration.isSecurityCaches());
}
private void verifyAddresses() {

View File

@ -385,6 +385,7 @@
<processor>true</processor>
<uptime>true</uptime>
<logging>true</logging>
<security-caches>true</security-caches>
<plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin">
<property key="foo" value="x"/>
<property key="bar" value="y"/>

View File

@ -272,6 +272,7 @@
<processor>true</processor>
<uptime>true</uptime>
<logging>true</logging>
<security-caches>true</security-caches>
<plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin">
<property key="foo" value="x"/>
<property key="bar" value="y"/>

View File

@ -23,6 +23,7 @@
<processor>true</processor>
<uptime>true</uptime>
<logging>true</logging>
<security-caches>true</security-caches>
<plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin">
<property key="foo" value="x"/>
<property key="bar" value="y"/>

View File

@ -28,6 +28,7 @@
<processor>true</processor>
<uptime>true</uptime>
<logging>true</logging>
<security-caches>true</security-caches>
<plugin class-name="org.apache.activemq.artemis.core.config.impl.FileConfigurationTest$FakeMetricPlugin">
<property key="key1" value="value1"/>
<property key="key2" value="value2"/>

View File

@ -45,40 +45,55 @@ It takes no key/value properties for configuration.
The following metrics are exported, categorized by component.
A description for each metric is exported along with the metric itself therefore the description will not be repeated here.
*Every* metric is "tagged" with the `broker` tag (configured via `<name>` in `broker.xml`).
A _tag_ is a piece of metadata that gives context to the metric.
These tags are the foundation of what is sometimes referred to as "dimensional metrics."
Metrics _may_ have additional tags, but at the very least they will all have the `broker` tag.
Lastly, all metrics specifically for ActiveMQ Artemis are prefixed with `artemis.`.
=== Broker
* connection.count
* total.connection.count
* address.memory.usage
* address.memory.usage.percentage
* disk.store.usage
* `connection.count`
* `total.connection.count`
* `address.memory.usage`
* `address.memory.usage.percentage`
* `disk.store.usage`
* `replica.sync`
* `active`
* `authentication.count` tagged by `result` - either `success` or `failure`
* `authorization.count` tagged by `result` - either `success` or `failure`
=== Address
* routed.message.count
* unrouted.message.count
* address.size
* number.of.pages
These metrics are tagged with the `address` tag which reflects the name of the corresponding address.
* `routed.message.count`
* `unrouted.message.count`
* `address.size`
* `number.of.pages`
=== Queue
* message.count
* durable.message.count
* persistent.size
* durable.persistent.size
* delivering.message.count
* delivering.durable.message.count
* delivering.persistent.size
* delivering.durable.persistent.size
* scheduled.message.count
* scheduled.durable.message.count
* scheduled.persistent.size
* scheduled.durable.persistent.size
* messages.acknowledged
* messages.added
* messages.killed
* messages.expired
* consumer.count
These metrics are tagged with the `address` & `queue` tags which reflects the name of the corresponding address & queue respectively.
* `message.count`
* `durable.message.count`
* `persistent.size`
* `durable.persistent.size`
* `delivering.message.count`
* `delivering.durable.message.count`
* `delivering.persistent.size`
* `delivering.durable.persistent.size`
* `scheduled.message.count`
* `scheduled.durable.message.count`
* `scheduled.persistent.size`
* `scheduled.durable.persistent.size`
* `messages.acknowledged`
* `messages.added`
* `messages.killed`
* `messages.expired`
* `consumer.count`
It may appear that some higher level broker metrics are missing (e.g. total message count).
However, these metrics can be deduced by aggregating the lower level metrics (e.g. aggregate the message.count metrics from all queues to get the total).
@ -86,6 +101,7 @@ However, these metrics can be deduced by aggregating the lower level metrics (e.
=== Optional metrics
There are a handful of other useful metrics that are related to the JVM, the underlying operating system, etc.
These metrics are provided specifically by Micrometer and therefore do not have the `artmemis.` prefix.
JVM memory metrics::
Gauges buffer and memory pool utilization.
@ -128,6 +144,16 @@ Disabled by default.
This works _exclusively_ with Log4j2 (i.e the default logging implementation shipped with the broker).
If you're embedding the broker and using a different logging implementation (e.g. Log4j 1.x, JUL, Logback, etc.) and you enable these metrics then the broker will fail to start with a `java.lang.NoClassDefFoundError` as it attempts to locate Log4j2 classes that don't exist on the classpath.
====
Security caches::
The following authentication & authorization cache metrics are exported. They are all tagged by `cache` (either `authentication` or `authorization`). Additional tags are noted.
* `cache.size`
* `cache.puts`
* `cache.gets` tagged by `result` - either `hit` or `miss`
* `cache.evictions`
* `cache.eviction.weight`
+
Disabled by default.
== Configuration
@ -148,6 +174,7 @@ Here's a configuration with all optional metrics:
<processor>true</processor> <!-- defaults to false -->
<uptime>true</uptime> <!-- defaults to false -->
<logging>true</logging> <!-- defaults to false -->
<security-caches>true</security-caches> <!-- defaults to false -->
<plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.LoggingMetricsPlugin"/>
</metrics>
----

View File

@ -50,6 +50,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.ActiveMQTimeoutException;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -318,6 +319,57 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertEquals(0, serverControl.getAuthorizationCacheSize());
}
@Test
public void testAuthCounts() throws Exception {
ActiveMQServerControl serverControl = createManagementControl();
Assert.assertEquals(0, serverControl.getAuthenticationSuccessCount());
Assert.assertEquals(0, serverControl.getAuthenticationFailureCount());
Assert.assertEquals(0, serverControl.getAuthorizationSuccessCount());
Assert.assertEquals(0, serverControl.getAuthorizationFailureCount());
ServerLocator loc = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(loc);
ClientSession successSession = csf.createSession("myUser", "myPass", false, true, false, false, 0);
successSession.start();
final String address = "ADDRESS";
serverControl.createAddress(address, "MULTICAST");
ClientProducer producer = successSession.createProducer(address);
ClientMessage m = successSession.createMessage(true);
m.putStringProperty("hello", "world");
producer.send(m);
Assert.assertEquals(1, serverControl.getAuthenticationSuccessCount());
Assert.assertEquals(0, serverControl.getAuthenticationFailureCount());
Assert.assertEquals(1, serverControl.getAuthorizationSuccessCount());
Assert.assertEquals(0, serverControl.getAuthorizationFailureCount());
final String queue = "QUEUE";
server.createQueue(new QueueConfiguration(queue).setAddress(address).setRoutingType(RoutingType.MULTICAST));
ClientSession failedAuthzSession = csf.createSession("none", "none", false, true, false, false, 0);
try {
failedAuthzSession.createConsumer(queue);
} catch (ActiveMQSecurityException e) {
// expected
}
Assert.assertEquals(2, serverControl.getAuthenticationSuccessCount());
Assert.assertEquals(0, serverControl.getAuthenticationFailureCount());
Assert.assertEquals(1, serverControl.getAuthorizationSuccessCount());
Assert.assertEquals(1, serverControl.getAuthorizationFailureCount());
try {
csf.createSession("none", "badpassword", false, true, false, false, 0);
} catch (ActiveMQSecurityException e) {
// expected
}
Assert.assertEquals(2, serverControl.getAuthenticationSuccessCount());
Assert.assertEquals(1, serverControl.getAuthenticationFailureCount());
Assert.assertEquals(1, serverControl.getAuthorizationSuccessCount());
Assert.assertEquals(1, serverControl.getAuthorizationFailureCount());
}
@Test
public void testGetConnectors() throws Exception {
ActiveMQServerControl serverControl = createManagementControl();
@ -6161,8 +6213,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
SecurityConfiguration securityConfiguration = new SecurityConfiguration();
securityConfiguration.addUser("guest", "guest");
securityConfiguration.addUser("myUser", "myPass");
securityConfiguration.addUser("none", "none");
securityConfiguration.addRole("guest", "guest");
securityConfiguration.addRole("myUser", "guest");
securityConfiguration.addRole("none", "none");
securityConfiguration.setDefaultUser("guest");
ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), securityConfiguration);
conf.setJournalRetentionDirectory(conf.getJournalDirectory() + "_ret"); // needed for replay tests
@ -6172,6 +6226,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
HashSet<Role> role = new HashSet<>();
role.add(new Role("guest", true, true, true, true, true, true, true, true, true, true, false, false));
role.add(new Role("none", false, false, false, false, false, false, false, false, false, false, false, false));
server.getSecurityRepository().addMatch("#", role);
}

View File

@ -1800,6 +1800,26 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
public void clearAuthorizationCache() throws Exception {
proxy.invokeOperation("clearAuthorizationCache");
}
@Override
public long getAuthenticationSuccessCount() {
return (long) proxy.retrieveAttributeValue("authenticationSuccessCount");
}
@Override
public long getAuthenticationFailureCount() {
return (long) proxy.retrieveAttributeValue("authenticationFailureCount");
}
@Override
public long getAuthorizationSuccessCount() {
return (long) proxy.retrieveAttributeValue("authorizationSuccessCount");
}
@Override
public long getAuthorizationFailureCount() {
return (long) proxy.retrieveAttributeValue("authorizationFailureCount");
}
};
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <br>
* http://www.apache.org/licenses/LICENSE-2.0
* <br>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
public class CacheMetricsTest extends ActiveMQTestBase {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
@Test
public void testCacheMetricsEnabled() throws Exception {
testCacheMetrics(true);
}
@Test
public void testCacheMetricsDisabled() throws Exception {
testCacheMetrics(false);
}
private void testCacheMetrics(boolean enabled) throws Exception {
ActiveMQServer server = createServer(false, createDefaultInVMConfig().setSecurityEnabled(true)
.setMetricsConfiguration(new MetricsConfiguration()
.setPlugin(new SimpleMetricsPlugin().init(null))
.setSecurityCaches(enabled)));
server.start();
List<Meter.Id> metersToMatch = new ArrayList<>();
for (String cacheTagValue : Arrays.asList("authentication", "authorization")) {
Tags defaultTags = Tags.of(Tag.of("broker", "localhost"), Tag.of("cache", cacheTagValue));
metersToMatch.add(new Meter.Id("cache.size", defaultTags, null, null, null));
metersToMatch.add(new Meter.Id("cache.puts", defaultTags, null, null, null));
metersToMatch.add(new Meter.Id("cache.gets", defaultTags.and(Tag.of("result", "miss")), null, null, null));
metersToMatch.add(new Meter.Id("cache.gets", defaultTags.and(Tag.of("result", "hit")), null, null, null));
metersToMatch.add(new Meter.Id("cache.evictions", defaultTags, null, null, null));
metersToMatch.add(new Meter.Id("cache.eviction.weight", defaultTags, null, null, null));
}
assertEquals(enabled, MetricsPluginTest.getMetrics(server).keySet().containsAll(metersToMatch));
}
}

View File

@ -20,6 +20,7 @@ package org.apache.activemq.artemis.tests.integration.plugin;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -30,6 +31,7 @@ import java.util.stream.Collectors;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tag;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -97,15 +99,21 @@ public class MetricsPluginTest extends ActiveMQTestBase {
class Metric {
public final String name;
public final Double value;
public final List<Tag> tags;
private Metric(String name, Double value) {
this(name, value, Collections.EMPTY_LIST);
}
private Metric(String name, Double value, List<Tag> tags) {
this.name = name;
this.value = value;
this.tags = tags;
}
@Override
public String toString() {
return name + ": " + value;
return name + ": " + value + ": " + tags;
}
@Override
@ -114,12 +122,13 @@ public class MetricsPluginTest extends ActiveMQTestBase {
if (o == null || getClass() != o.getClass()) return false;
Metric metric = (Metric) o;
return Objects.equals(name, metric.name) &&
Objects.equals(value, metric.value);
Objects.equals(value, metric.value) &&
Objects.equals(tags, metric.tags);
}
@Override
public int hashCode() {
return Objects.hash(name, value);
return Objects.hash(name, value, tags);
}
}
@ -131,44 +140,52 @@ public class MetricsPluginTest extends ActiveMQTestBase {
List<Metric> artemisMetrics = metrics.entrySet().stream()
.map(entry -> new Metric(
entry.getKey().getName(),
entry.getValue()))
entry.getValue(),
entry.getKey().getTags()))
.filter(metric -> metric.name.startsWith("artemis"))
.collect(Collectors.toList());
assertThat(artemisMetrics, containsInAnyOrder(
// artemis.(un)routed.message.count is present twice, because of activemq.notifications address
new Metric("artemis.address.memory.usage", 0.0),
new Metric("artemis.address.memory.usage.percentage", 0.0),
new Metric("artemis.connection.count", 1.0),
new Metric("artemis.consumer.count", 0.0),
new Metric("artemis.delivering.durable.message.count", 0.0),
new Metric("artemis.delivering.durable.persistent.size", 0.0),
new Metric("artemis.delivering.message.count", 0.0),
new Metric("artemis.delivering.persistent_size", 0.0),
new Metric("artemis.disk.store.usage", 0.0),
new Metric("artemis.durable.message.count", 0.0),
new Metric("artemis.durable.persistent.size", 0.0),
new Metric("artemis.message.count", 0.0),
new Metric("artemis.messages.acknowledged", 0.0),
new Metric("artemis.messages.added", 0.0),
new Metric("artemis.messages.expired", 0.0),
new Metric("artemis.messages.killed", 0.0),
new Metric("artemis.persistent.size", 0.0),
new Metric("artemis.routed.message.count", 0.0),
new Metric("artemis.routed.message.count", 0.0),
new Metric("artemis.scheduled.durable.message.count", 0.0),
new Metric("artemis.scheduled.durable.persistent.size", 0.0),
new Metric("artemis.scheduled.message.count", 0.0),
new Metric("artemis.scheduled.persistent.size", 0.0),
new Metric("artemis.total.connection.count", 1.0),
new Metric("artemis.unrouted.message.count", 0.0),
new Metric("artemis.unrouted.message.count", 2.0),
new Metric("artemis.address.size", 0.0),
new Metric("artemis.address.size", 0.0),
new Metric("artemis.number.of.pages", 0.0),
new Metric("artemis.number.of.pages", 0.0),
new Metric("artemis.active", 1.0),
new Metric("artemis.replica.sync", 0.0)
// broker metrics
new Metric("artemis.address.memory.usage", 0.0, Arrays.asList(Tag.of("broker", "localhost"))),
new Metric("artemis.address.memory.usage.percentage", 0.0, Arrays.asList(Tag.of("broker", "localhost"))),
new Metric("artemis.connection.count", 1.0, Arrays.asList(Tag.of("broker", "localhost"))),
new Metric("artemis.total.connection.count", 1.0, Arrays.asList(Tag.of("broker", "localhost"))),
new Metric("artemis.active", 1.0, Arrays.asList(Tag.of("broker", "localhost"))),
new Metric("artemis.replica.sync", 0.0, Arrays.asList(Tag.of("broker", "localhost"))),
new Metric("artemis.disk.store.usage", 0.0, Arrays.asList(Tag.of("broker", "localhost"))),
new Metric("artemis.authentication.count", 0.0, Arrays.asList(Tag.of("broker", "localhost"), Tag.of("result", "success"))),
new Metric("artemis.authentication.count", 0.0, Arrays.asList(Tag.of("broker", "localhost"), Tag.of("result", "failure"))),
new Metric("artemis.authorization.count", 0.0, Arrays.asList(Tag.of("broker", "localhost"), Tag.of("result", "success"))),
new Metric("artemis.authorization.count", 0.0, Arrays.asList(Tag.of("broker", "localhost"), Tag.of("result", "failure"))),
// simpleQueue metrics
new Metric("artemis.message.count", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.durable.message.count", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.persistent.size", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.durable.persistent.size", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.delivering.message.count", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.delivering.durable.message.count", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.delivering.persistent_size", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.delivering.durable.persistent.size", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.scheduled.message.count", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.scheduled.durable.message.count", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.scheduled.persistent.size", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.scheduled.durable.persistent.size", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.messages.acknowledged", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.messages.added", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.messages.killed", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.messages.expired", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
new Metric("artemis.consumer.count", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"), Tag.of("queue", "simpleQueue"))),
// simpleAddress metrics
new Metric("artemis.routed.message.count", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"))),
new Metric("artemis.unrouted.message.count", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"))),
new Metric("artemis.address.size", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"))),
new Metric("artemis.number.of.pages", 0.0, Arrays.asList(Tag.of("address", "simpleAddress"), Tag.of("broker", "localhost"))),
// activemq.notifications metrics
new Metric("artemis.routed.message.count", 0.0, Arrays.asList(Tag.of("address", "activemq.notifications"), Tag.of("broker", "localhost"))),
new Metric("artemis.unrouted.message.count", 2.0, Arrays.asList(Tag.of("address", "activemq.notifications"), Tag.of("broker", "localhost"))),
new Metric("artemis.address.size", 0.0, Arrays.asList(Tag.of("address", "activemq.notifications"), Tag.of("broker", "localhost"))),
new Metric("artemis.number.of.pages", 0.0, Arrays.asList(Tag.of("address", "activemq.notifications"), Tag.of("broker", "localhost")))
));
}