ARTEMIS-4923 reduce synchronization in ManagementServiceImpl

The `ManagementService` is used by the broker to register and
unregister components for management as well as send notifications.
When the broker is busy dealing with new sessions and auto-creating
queues, addresses, etc. there is a lot of contention.

To reduce synchronization and improve the service overall this commit
does the following:

 - Remove `synchronized` from most methods. In most cases it's
   completely unnecessary because the methods are already using a
   thread-safe data-structure (e.g. `ConcurrentHashMap`) or more
   specific synchronization is already in place (e.g. on
   `mbeanServer`).
 - Adds new & clarifies existing logging.
 - Synchronizes `start` & `stop` methods and adds gates via `started`.
 - Simplifies the `sendNotification` method by synchronizing once rather
   than twice and performing legitimacy checks sooner.
 - Removing an unnecessary overload of the `registereQueue` method.

To be clear, there are no tests included with this commit as there
should be no semantic changes. Existing tests should be sufficient to
identify any regressions.
This commit is contained in:
Justin Bertram 2024-07-12 12:29:29 -05:00 committed by Timothy Bish
parent 26cc17c337
commit 10adca5479
3 changed files with 135 additions and 183 deletions

View File

@ -1070,8 +1070,8 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222255, value = "Unable to calculate file store usage", level = LogMessage.Level.WARN)
void unableToCalculateFileStoreUsage(Exception e);
@LogMessage(id = 222256, value = "Failed to unregister acceptors", level = LogMessage.Level.WARN)
void failedToUnregisterAcceptors(Exception e);
@LogMessage(id = 222256, value = "Failed to unregister acceptor: {}", level = LogMessage.Level.WARN)
void failedToUnregisterAcceptor(String acceptor, Exception e);
@LogMessage(id = 222257, value = "Failed to decrement message reference count", level = LogMessage.Level.WARN)
void failedToDecrementMessageReferenceCount(Exception e);

View File

@ -26,7 +26,6 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -152,7 +151,7 @@ public class ManagementServiceImpl implements ManagementService {
private final SimpleString managementAddress;
private boolean started = false;
private volatile boolean started = false;
private final boolean messageCounterEnabled;
@ -166,6 +165,7 @@ public class ManagementServiceImpl implements ManagementService {
private final Pattern viewPermissionMatcher;
private final Set<ObjectName> registeredNames = new ConcurrentHashSet<>();
public ManagementServiceImpl(final MBeanServer mbeanServer, final Configuration configuration) {
this.mbeanServer = mbeanServer;
@ -255,9 +255,8 @@ public class ManagementServiceImpl implements ManagementService {
}
@Override
public synchronized void unregisterServer() throws Exception {
ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName();
unregisterFromJMX(objectName);
public void unregisterServer() throws Exception {
unregisterFromJMX(objectNameBuilder.getActiveMQServerObjectName());
unregisterFromRegistry(ResourceNames.BROKER);
if (messagingServer != null) {
unregisterMeters(ResourceNames.BROKER + "." + messagingServer.getConfiguration().getName());
@ -266,16 +265,10 @@ public class ManagementServiceImpl implements ManagementService {
@Override
public void registerAddress(AddressInfo addressInfo) throws Exception {
ObjectName objectName = objectNameBuilder.getAddressObjectName(addressInfo.getName());
AddressControlImpl addressControl = new AddressControlImpl(addressInfo, messagingServer, pagingManager, storageManager, securityRepository, securityStore, this);
registerInJMX(objectName, addressControl);
registerInJMX(objectNameBuilder.getAddressObjectName(addressInfo.getName()), addressControl);
registerInRegistry(ResourceNames.ADDRESS + addressInfo.getName(), addressControl);
registerAddressMeters(addressInfo, addressControl);
logger.debug("registered address {}", objectName);
}
@Override
@ -294,43 +287,28 @@ public class ManagementServiceImpl implements ManagementService {
}
@Override
public synchronized void unregisterAddress(final SimpleString address) throws Exception {
ObjectName objectName = objectNameBuilder.getAddressObjectName(address);
unregisterFromJMX(objectName);
public void unregisterAddress(final SimpleString address) throws Exception {
unregisterFromJMX(objectNameBuilder.getAddressObjectName(address));
unregisterFromRegistry(ResourceNames.ADDRESS + address);
unregisterMeters(ResourceNames.ADDRESS + address);
}
public synchronized void registerQueue(final Queue queue,
final AddressInfo addressInfo,
final StorageManager storageManager) throws Exception {
QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), messagingServer, storageManager, securityStore, addressSettingsRepository);
@Override
public void registerQueue(final Queue queue, final SimpleString address, final StorageManager storageManager) throws Exception {
QueueControlImpl queueControl = new QueueControlImpl(queue, address.toString(), messagingServer, storageManager, securityStore, addressSettingsRepository);
if (messageCounterManager != null) {
MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurable(), messageCounterManager.getMaxDayCount());
queueControl.setMessageCounter(counter);
messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
}
ObjectName objectName = objectNameBuilder.getQueueObjectName(addressInfo.getName(), queue.getName(), queue.getRoutingType());
registerInJMX(objectName, queueControl);
registerInJMX(objectNameBuilder.getQueueObjectName(address, queue.getName(), queue.getRoutingType()), queueControl);
registerInRegistry(ResourceNames.QUEUE + queue.getName(), queueControl);
registerQueueMeters(queue);
logger.debug("registered queue {}", objectName);
}
@Override
public synchronized void registerQueue(final Queue queue,
final SimpleString address,
final StorageManager storageManager) throws Exception {
registerQueue(queue, new AddressInfo(address), storageManager);
}
@Override
public synchronized void unregisterQueue(final SimpleString name, final SimpleString address, RoutingType routingType) throws Exception {
ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType);
unregisterFromJMX(objectName);
public void unregisterQueue(final SimpleString name, final SimpleString address, RoutingType routingType) throws Exception {
unregisterFromJMX(objectNameBuilder.getQueueObjectName(address, name, routingType));
unregisterFromRegistry(ResourceNames.QUEUE + name);
unregisterMeters(ResourceNames.QUEUE + name);
if (messageCounterManager != null) {
@ -378,63 +356,47 @@ public class ManagementServiceImpl implements ManagementService {
}
@Override
public synchronized void registerDivert(final Divert divert) throws Exception {
ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), divert.getAddress().toString());
public void registerDivert(final Divert divert) throws Exception {
DivertControl divertControl = new DivertControlImpl(divert, storageManager, messagingServer.getInternalNamingPrefix());
registerInJMX(objectName, divertControl);
registerInJMX(objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), divert.getAddress().toString()), divertControl);
registerInRegistry(ResourceNames.DIVERT + divert.getUniqueName(), divertControl);
logger.debug("registered divert {}", objectName);
}
@Override
public synchronized void unregisterDivert(final SimpleString name, final SimpleString address) throws Exception {
ObjectName objectName = objectNameBuilder.getDivertObjectName(name.toString(), address.toString());
unregisterFromJMX(objectName);
public void unregisterDivert(final SimpleString name, final SimpleString address) throws Exception {
unregisterFromJMX(objectNameBuilder.getDivertObjectName(name.toString(), address.toString()));
unregisterFromRegistry(ResourceNames.DIVERT + name);
}
@Override
public synchronized void registerAcceptor(final Acceptor acceptor,
final TransportConfiguration configuration) throws Exception {
ObjectName objectName = objectNameBuilder.getAcceptorObjectName(configuration.getName());
public void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception {
AcceptorControl control = new AcceptorControlImpl(acceptor, storageManager, configuration);
registerInJMX(objectName, control);
registerInJMX(objectNameBuilder.getAcceptorObjectName(configuration.getName()), control);
registerInRegistry(ResourceNames.ACCEPTOR + configuration.getName(), control);
}
@Override
public void unregisterAcceptors() {
List<String> acceptors = new ArrayList<>();
synchronized (this) {
for (String resourceName : registry.keySet()) {
if (resourceName.startsWith(ResourceNames.ACCEPTOR)) {
acceptors.add(resourceName);
for (String resourceName : new HashSet<>(registry.keySet())) {
if (resourceName.startsWith(ResourceNames.ACCEPTOR)) {
String name = resourceName.substring(ResourceNames.ACCEPTOR.length());
try {
unregisterAcceptor(name);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToUnregisterAcceptor(name, e);
}
}
}
for (String acceptor : acceptors) {
String name = acceptor.substring(ResourceNames.ACCEPTOR.length());
try {
unregisterAcceptor(name);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToUnregisterAcceptors(e);
}
}
}
public synchronized void unregisterAcceptor(final String name) throws Exception {
ObjectName objectName = objectNameBuilder.getAcceptorObjectName(name);
unregisterFromJMX(objectName);
public void unregisterAcceptor(final String name) throws Exception {
unregisterFromJMX(objectNameBuilder.getAcceptorObjectName(name));
unregisterFromRegistry(ResourceNames.ACCEPTOR + name);
}
@Override
public synchronized void registerBroadcastGroup(final BroadcastGroup broadcastGroup,
final BroadcastGroupConfiguration configuration) throws Exception {
public void registerBroadcastGroup(final BroadcastGroup broadcastGroup, final BroadcastGroupConfiguration configuration) throws Exception {
broadcastGroup.setNotificationService(this);
ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(configuration.getName());
BroadcastEndpointFactory endpointFactory = configuration.getEndpointFactory();
BaseBroadcastGroupControl control = null;
if (endpointFactory instanceof UDPBroadcastEndpointFactory) {
@ -446,78 +408,66 @@ public class ManagementServiceImpl implements ManagementService {
} else {
control = new BaseBroadcastGroupControlImpl(broadcastGroup, storageManager, configuration);
}
registerInJMX(objectName, control);
registerInJMX(objectNameBuilder.getBroadcastGroupObjectName(configuration.getName()), control);
registerInRegistry(ResourceNames.BROADCAST_GROUP + configuration.getName(), control);
}
@Override
public synchronized void unregisterBroadcastGroup(final String name) throws Exception {
ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(name);
unregisterFromJMX(objectName);
public void unregisterBroadcastGroup(final String name) throws Exception {
unregisterFromJMX(objectNameBuilder.getBroadcastGroupObjectName(name));
unregisterFromRegistry(ResourceNames.BROADCAST_GROUP + name);
}
@Override
public synchronized void registerBridge(final Bridge bridge) throws Exception {
public void registerBridge(final Bridge bridge) throws Exception {
bridge.setNotificationService(this);
ObjectName objectName = objectNameBuilder.getBridgeObjectName(bridge.getConfiguration().getName());
BridgeControl control = new BridgeControlImpl(bridge, storageManager);
registerInJMX(objectName, control);
registerInJMX(objectNameBuilder.getBridgeObjectName(bridge.getConfiguration().getName()), control);
registerInRegistry(ResourceNames.BRIDGE + bridge.getName(), control);
}
@Override
public synchronized void unregisterBridge(final String name) throws Exception {
ObjectName objectName = objectNameBuilder.getBridgeObjectName(name);
unregisterFromJMX(objectName);
public void unregisterBridge(final String name) throws Exception {
unregisterFromJMX(objectNameBuilder.getBridgeObjectName(name));
unregisterFromRegistry(ResourceNames.BRIDGE + name);
}
@Override
public synchronized void registerCluster(final ClusterConnection cluster,
final ClusterConnectionConfiguration configuration) throws Exception {
ObjectName objectName = objectNameBuilder.getClusterConnectionObjectName(configuration.getName());
public void registerCluster(final ClusterConnection cluster, final ClusterConnectionConfiguration configuration) throws Exception {
ClusterConnectionControl control = new ClusterConnectionControlImpl(cluster, storageManager, configuration);
registerInJMX(objectName, control);
registerInJMX(objectNameBuilder.getClusterConnectionObjectName(configuration.getName()), control);
registerInRegistry(ResourceNames.CORE_CLUSTER_CONNECTION + configuration.getName(), control);
}
@Override
public synchronized void unregisterCluster(final String name) throws Exception {
ObjectName objectName = objectNameBuilder.getClusterConnectionObjectName(name);
unregisterFromJMX(objectName);
public void unregisterCluster(final String name) throws Exception {
unregisterFromJMX(objectNameBuilder.getClusterConnectionObjectName(name));
unregisterFromRegistry(ResourceNames.CORE_CLUSTER_CONNECTION + name);
}
@Override
public synchronized void registerConnectionRouter(final ConnectionRouter router) throws Exception {
ObjectName objectName = objectNameBuilder.getConnectionRouterObjectName(router.getName());
public void registerConnectionRouter(final ConnectionRouter router) throws Exception {
ConnectionRouterControl connectionRouterControl = new ConnectionRouterControlImpl(router, storageManager);
registerInJMX(objectName, connectionRouterControl);
registerInJMX(objectNameBuilder.getConnectionRouterObjectName(router.getName()), connectionRouterControl);
registerInRegistry(ResourceNames.CONNECTION_ROUTER + router.getName(), connectionRouterControl);
logger.debug("registered connection router {}", objectName);
}
@Override
public synchronized void unregisterConnectionRouter(final String name) throws Exception {
ObjectName objectName = objectNameBuilder.getConnectionRouterObjectName(name);
unregisterFromJMX(objectName);
public void unregisterConnectionRouter(final String name) throws Exception {
unregisterFromJMX(objectNameBuilder.getConnectionRouterObjectName(name));
unregisterFromRegistry(ResourceNames.CONNECTION_ROUTER + name);
}
@Override
public void registerHawtioSecurity(GuardInvocationHandler guard) throws Exception {
ObjectName objectName = objectNameBuilder.getSecurityObjectName();
HawtioSecurityControl control = new HawtioSecurityControlImpl(guard, storageManager);
registerInJMX(objectName, control);
registerInJMX(objectNameBuilder.getSecurityObjectName(), control);
registerInRegistry(ResourceNames.MANAGEMENT_SECURITY, control);
}
@Override
public void unregisterHawtioSecurity() throws Exception {
ObjectName objectName = objectNameBuilder.getSecurityObjectName();
unregisterFromJMX(objectName);
unregisterFromJMX(objectNameBuilder.getSecurityObjectName());
unregisterFromRegistry(ResourceNames.MANAGEMENT_SECURITY);
}
@ -607,15 +557,14 @@ public class ManagementServiceImpl implements ManagementService {
}
@Override
public synchronized Object getResource(final String resourceName) {
public Object getResource(final String resourceName) {
return registry.get(resourceName);
}
@Override
public synchronized Object[] getResources(final Class<?> resourceType) {
public Object[] getResources(final Class<?> resourceType) {
List<Object> resources = new ArrayList<>();
Collection<Object> clone = new ArrayList<>(registry.values());
for (Object entry : clone) {
for (Object entry : new ArrayList<>(registry.values())) {
if (resourceType.isAssignableFrom(entry.getClass())) {
resources.add(entry);
}
@ -623,8 +572,6 @@ public class ManagementServiceImpl implements ManagementService {
return resources.toArray(new Object[resources.size()]);
}
private final Set<ObjectName> registeredNames = new HashSet<>();
@Override
public void registerInJMX(final ObjectName objectName, final Object managedResource) throws Exception {
if (!jmxManagementEnabled) {
@ -633,28 +580,13 @@ public class ManagementServiceImpl implements ManagementService {
synchronized (mbeanServer) {
unregisterFromJMX(objectName);
mbeanServer.registerMBean(managedResource, objectName);
registeredNames.add(objectName);
}
logger.debug("Registered in JMX: {} as {}", objectName, managedResource);
}
@Override
public synchronized void registerInRegistry(final String resourceName, final Object managedResource) {
unregisterFromRegistry(resourceName);
logger.debug("Registering {} as {}", resourceName, managedResource);
registry.put(resourceName, managedResource);
}
@Override
public synchronized void unregisterFromRegistry(final String resourceName) {
registry.remove(resourceName);
}
// the JMX unregistration is synchronized to avoid race conditions if 2 clients tries to
// the JMX unregistration is synchronized to avoid race conditions if 2 clients try to
// unregister the same resource (e.g. a queue) at the same time since unregisterMBean()
// will throw an exception if the MBean has already been unregistered
@Override
@ -666,10 +598,30 @@ public class ManagementServiceImpl implements ManagementService {
synchronized (mbeanServer) {
if (mbeanServer.isRegistered(objectName)) {
mbeanServer.unregisterMBean(objectName);
registeredNames.remove(objectName);
}
}
logger.debug("Unregistered from JMX: {}", objectName);
}
@Override
public void registerInRegistry(final String resourceName, final Object managedResource) {
Object replaced = registry.put(resourceName, managedResource);
String addendum = "";
if (replaced != null) {
addendum = ". Replaced: " + replaced;
}
logger.debug("Registered in management: {} as {}{}", resourceName, managedResource, addendum);
}
@Override
public void unregisterFromRegistry(final String resourceName) {
Object removed = registry.remove(resourceName);
if (removed != null) {
logger.debug("Unregistered from management: {} as {}", resourceName, removed);
} else {
logger.debug("Attempted to unregister {} from management, but it was not registered.");
}
}
@Override
@ -692,16 +644,16 @@ public class ManagementServiceImpl implements ManagementService {
return managementNotificationAddress;
}
// ActiveMQComponent implementation -----------------------------
@Override
public void start() throws Exception {
public synchronized void start() throws Exception {
if (started) {
return;
}
if (messageCounterEnabled) {
messageCounterManager.start();
}
started = true;
/**
* Ensure the management notification address is created otherwise if auto-create-address = false then cluster
* bridges won't be able to connect.
@ -719,10 +671,18 @@ public class ManagementServiceImpl implements ManagementService {
}
}
});
started = true;
}
@Override
public synchronized void stop() throws Exception {
if (!started) {
return;
}
started = false;
Set<String> resourceNames = new HashSet<>(registry.keySet());
for (String resourceName : resourceNames) {
@ -783,8 +743,6 @@ public class ManagementServiceImpl implements ManagementService {
storageManager = null;
registeredNames.clear();
started = false;
}
@Override
@ -794,60 +752,56 @@ public class ManagementServiceImpl implements ManagementService {
@Override
public void sendNotification(final Notification notification) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Sending Notification = {}, notificationEnabled={} messagingServerControl={}",
notification, notificationsEnabled, messagingServerControl);
if (messagingServerControl == null || !notificationsEnabled) {
return;
}
// This needs to be synchronized since we need to ensure notifications are processed in strict sequence
synchronized (this) {
if (messagingServerControl != null && notificationsEnabled) {
// We also need to synchronize on the post office notification lock
// otherwise we can get notifications arriving in wrong order / missing
// if a notification occurs at same time as sendQueueInfoToQueue is processed
synchronized (postOffice.getNotificationLock()) {
// First send to any local listeners
for (NotificationListener listener : listeners) {
try {
listener.onNotification(notification);
} catch (Exception e) {
// Exception thrown from one listener should not stop execution of others
ActiveMQServerLogger.LOGGER.errorCallingNotifListener(e);
}
}
// This needs to be synchronized since we need to ensure notifications are processed in strict sequence.
// Furthermore, this needs to synchronize on the PostOffice notificationLock otherwise we can get notifications
// arriving in the wrong order or missing notifications if one occurs at same time as sendQueueInfoToQueue is
// processed.
synchronized (postOffice.getNotificationLock()) {
logger.trace("Sending notification={}", notification);
// start sending notification *messages* only when server has initialised
// Note at backup initialisation we don't want to send notifications either
// https://jira.jboss.org/jira/browse/HORNETQ-317
if (messagingServer == null || !messagingServer.isActive()) {
logger.debug("ignoring message {} as the server is not initialized", notification);
return;
}
long messageID = storageManager.generateID();
Message notificationMessage = new CoreMessage(messageID, 512);
// Notification messages are always durable so the user can choose whether to add a durable queue to
// consume them in
notificationMessage.setDurable(true);
notificationMessage.setAddress(managementNotificationAddress);
if (notification.getProperties() != null) {
TypedProperties props = notification.getProperties();
props.forEach(notificationMessage::putObjectProperty);
}
notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, SimpleString.of(notification.getType().toString()));
long timestamp = System.currentTimeMillis();
notificationMessage.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, timestamp);
notificationMessage.setTimestamp(timestamp);
postOffice.route(notificationMessage, false);
// First send to any local listeners
for (NotificationListener listener : listeners) {
try {
listener.onNotification(notification);
} catch (Exception e) {
// Exception thrown from one listener should not stop execution of others
ActiveMQServerLogger.LOGGER.errorCallingNotifListener(e);
}
}
// start sending notification *messages* only when server has initialised
// Note at backup initialisation we don't want to send notifications either
// https://jira.jboss.org/jira/browse/HORNETQ-317
if (messagingServer == null || !messagingServer.isActive()) {
logger.debug("ignoring message {} as the server is not initialized", notification);
return;
}
long messageID = storageManager.generateID();
Message notificationMessage = new CoreMessage(messageID, 512);
// Notification messages are always durable so the user can choose whether to add a durable queue to
// consume them in
notificationMessage.setDurable(true);
notificationMessage.setAddress(managementNotificationAddress);
if (notification.getProperties() != null) {
TypedProperties props = notification.getProperties();
props.forEach(notificationMessage::putObjectProperty);
}
notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, SimpleString.of(notification.getType().toString()));
long timestamp = System.currentTimeMillis();
notificationMessage.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, timestamp);
notificationMessage.setTimestamp(timestamp);
postOffice.route(notificationMessage, false);
}
}
@ -865,7 +819,7 @@ public class ManagementServiceImpl implements ManagementService {
throw ActiveMQMessageBundle.BUNDLE.cannotFindResource(resourceName);
}
Method method = null;
Method method;
String upperCaseAttribute = attribute.substring(0, 1).toUpperCase() + attribute.substring(1);
try {
@ -899,7 +853,6 @@ public class ManagementServiceImpl implements ManagementService {
throw ActiveMQMessageBundle.BUNDLE.cannotFindResource(resourceName);
}
Method method = null;
Method[] methods = resource.getClass().getMethods();
@ -966,5 +919,4 @@ public class ManagementServiceImpl implements ManagementService {
return correlationId;
}
}

View File

@ -111,7 +111,7 @@ public class ManagementServiceImplTest {
Mockito.when(queue.getRoutingType()).thenReturn(RoutingType.ANYCAST);
StorageManager storageManager = Mockito.mock(StorageManager.class);
managementService.registerQueue(queue, new AddressInfo(queueName), storageManager);
managementService.registerQueue(queue, queueName, storageManager);
managementService.getAttribute(ResourceNames.QUEUE + queueName, "ringSize", auth);
expected = SimpleString.of("mm.queue." + queueName + ".getRingSize");
@ -174,7 +174,7 @@ public class ManagementServiceImplTest {
Mockito.when(queue.getRoutingType()).thenReturn(RoutingType.ANYCAST);
StorageManager storageManager = Mockito.mock(StorageManager.class);
managementService.registerQueue(queue, new AddressInfo(queueName), storageManager);
managementService.registerQueue(queue, queueName, storageManager);
managementService.invokeOperation(ResourceNames.QUEUE + queueName, "getRingSize", new Object[]{}, auth);
expected = SimpleString.of("$mm.queue." + queueName + ".getRingSize");