diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 6ace528ee2..33e2887a12 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1070,8 +1070,8 @@ public interface ActiveMQServerLogger { @LogMessage(id = 222255, value = "Unable to calculate file store usage", level = LogMessage.Level.WARN) void unableToCalculateFileStoreUsage(Exception e); - @LogMessage(id = 222256, value = "Failed to unregister acceptors", level = LogMessage.Level.WARN) - void failedToUnregisterAcceptors(Exception e); + @LogMessage(id = 222256, value = "Failed to unregister acceptor: {}", level = LogMessage.Level.WARN) + void failedToUnregisterAcceptor(String acceptor, Exception e); @LogMessage(id = 222257, value = "Failed to decrement message reference count", level = LogMessage.Level.WARN) void failedToDecrementMessageReferenceCount(Exception e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index ff5fceb39f..113418a5cc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -26,7 +26,6 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -152,7 +151,7 @@ public class ManagementServiceImpl implements ManagementService { private final SimpleString managementAddress; - private boolean started = false; + private volatile boolean started = false; private final boolean messageCounterEnabled; @@ -166,6 +165,7 @@ public class ManagementServiceImpl implements ManagementService { private final Pattern viewPermissionMatcher; + private final Set registeredNames = new ConcurrentHashSet<>(); public ManagementServiceImpl(final MBeanServer mbeanServer, final Configuration configuration) { this.mbeanServer = mbeanServer; @@ -255,9 +255,8 @@ public class ManagementServiceImpl implements ManagementService { } @Override - public synchronized void unregisterServer() throws Exception { - ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName(); - unregisterFromJMX(objectName); + public void unregisterServer() throws Exception { + unregisterFromJMX(objectNameBuilder.getActiveMQServerObjectName()); unregisterFromRegistry(ResourceNames.BROKER); if (messagingServer != null) { unregisterMeters(ResourceNames.BROKER + "." + messagingServer.getConfiguration().getName()); @@ -266,16 +265,10 @@ public class ManagementServiceImpl implements ManagementService { @Override public void registerAddress(AddressInfo addressInfo) throws Exception { - ObjectName objectName = objectNameBuilder.getAddressObjectName(addressInfo.getName()); AddressControlImpl addressControl = new AddressControlImpl(addressInfo, messagingServer, pagingManager, storageManager, securityRepository, securityStore, this); - - registerInJMX(objectName, addressControl); - + registerInJMX(objectNameBuilder.getAddressObjectName(addressInfo.getName()), addressControl); registerInRegistry(ResourceNames.ADDRESS + addressInfo.getName(), addressControl); - registerAddressMeters(addressInfo, addressControl); - - logger.debug("registered address {}", objectName); } @Override @@ -294,43 +287,28 @@ public class ManagementServiceImpl implements ManagementService { } @Override - public synchronized void unregisterAddress(final SimpleString address) throws Exception { - ObjectName objectName = objectNameBuilder.getAddressObjectName(address); - - unregisterFromJMX(objectName); + public void unregisterAddress(final SimpleString address) throws Exception { + unregisterFromJMX(objectNameBuilder.getAddressObjectName(address)); unregisterFromRegistry(ResourceNames.ADDRESS + address); unregisterMeters(ResourceNames.ADDRESS + address); } - public synchronized void registerQueue(final Queue queue, - final AddressInfo addressInfo, - final StorageManager storageManager) throws Exception { - - QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), messagingServer, storageManager, securityStore, addressSettingsRepository); + @Override + public void registerQueue(final Queue queue, final SimpleString address, final StorageManager storageManager) throws Exception { + QueueControlImpl queueControl = new QueueControlImpl(queue, address.toString(), messagingServer, storageManager, securityStore, addressSettingsRepository); if (messageCounterManager != null) { MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurable(), messageCounterManager.getMaxDayCount()); queueControl.setMessageCounter(counter); messageCounterManager.registerMessageCounter(queue.getName().toString(), counter); } - ObjectName objectName = objectNameBuilder.getQueueObjectName(addressInfo.getName(), queue.getName(), queue.getRoutingType()); - registerInJMX(objectName, queueControl); + registerInJMX(objectNameBuilder.getQueueObjectName(address, queue.getName(), queue.getRoutingType()), queueControl); registerInRegistry(ResourceNames.QUEUE + queue.getName(), queueControl); registerQueueMeters(queue); - - logger.debug("registered queue {}", objectName); } @Override - public synchronized void registerQueue(final Queue queue, - final SimpleString address, - final StorageManager storageManager) throws Exception { - registerQueue(queue, new AddressInfo(address), storageManager); - } - - @Override - public synchronized void unregisterQueue(final SimpleString name, final SimpleString address, RoutingType routingType) throws Exception { - ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType); - unregisterFromJMX(objectName); + public void unregisterQueue(final SimpleString name, final SimpleString address, RoutingType routingType) throws Exception { + unregisterFromJMX(objectNameBuilder.getQueueObjectName(address, name, routingType)); unregisterFromRegistry(ResourceNames.QUEUE + name); unregisterMeters(ResourceNames.QUEUE + name); if (messageCounterManager != null) { @@ -378,63 +356,47 @@ public class ManagementServiceImpl implements ManagementService { } @Override - public synchronized void registerDivert(final Divert divert) throws Exception { - ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), divert.getAddress().toString()); + public void registerDivert(final Divert divert) throws Exception { DivertControl divertControl = new DivertControlImpl(divert, storageManager, messagingServer.getInternalNamingPrefix()); - registerInJMX(objectName, divertControl); + registerInJMX(objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), divert.getAddress().toString()), divertControl); registerInRegistry(ResourceNames.DIVERT + divert.getUniqueName(), divertControl); - - logger.debug("registered divert {}", objectName); } @Override - public synchronized void unregisterDivert(final SimpleString name, final SimpleString address) throws Exception { - ObjectName objectName = objectNameBuilder.getDivertObjectName(name.toString(), address.toString()); - unregisterFromJMX(objectName); + public void unregisterDivert(final SimpleString name, final SimpleString address) throws Exception { + unregisterFromJMX(objectNameBuilder.getDivertObjectName(name.toString(), address.toString())); unregisterFromRegistry(ResourceNames.DIVERT + name); } @Override - public synchronized void registerAcceptor(final Acceptor acceptor, - final TransportConfiguration configuration) throws Exception { - ObjectName objectName = objectNameBuilder.getAcceptorObjectName(configuration.getName()); + public void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception { AcceptorControl control = new AcceptorControlImpl(acceptor, storageManager, configuration); - registerInJMX(objectName, control); + registerInJMX(objectNameBuilder.getAcceptorObjectName(configuration.getName()), control); registerInRegistry(ResourceNames.ACCEPTOR + configuration.getName(), control); } @Override public void unregisterAcceptors() { - List acceptors = new ArrayList<>(); - synchronized (this) { - for (String resourceName : registry.keySet()) { - if (resourceName.startsWith(ResourceNames.ACCEPTOR)) { - acceptors.add(resourceName); + for (String resourceName : new HashSet<>(registry.keySet())) { + if (resourceName.startsWith(ResourceNames.ACCEPTOR)) { + String name = resourceName.substring(ResourceNames.ACCEPTOR.length()); + try { + unregisterAcceptor(name); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.failedToUnregisterAcceptor(name, e); } } } - - for (String acceptor : acceptors) { - String name = acceptor.substring(ResourceNames.ACCEPTOR.length()); - try { - unregisterAcceptor(name); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.failedToUnregisterAcceptors(e); - } - } } - public synchronized void unregisterAcceptor(final String name) throws Exception { - ObjectName objectName = objectNameBuilder.getAcceptorObjectName(name); - unregisterFromJMX(objectName); + public void unregisterAcceptor(final String name) throws Exception { + unregisterFromJMX(objectNameBuilder.getAcceptorObjectName(name)); unregisterFromRegistry(ResourceNames.ACCEPTOR + name); } @Override - public synchronized void registerBroadcastGroup(final BroadcastGroup broadcastGroup, - final BroadcastGroupConfiguration configuration) throws Exception { + public void registerBroadcastGroup(final BroadcastGroup broadcastGroup, final BroadcastGroupConfiguration configuration) throws Exception { broadcastGroup.setNotificationService(this); - ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(configuration.getName()); BroadcastEndpointFactory endpointFactory = configuration.getEndpointFactory(); BaseBroadcastGroupControl control = null; if (endpointFactory instanceof UDPBroadcastEndpointFactory) { @@ -446,78 +408,66 @@ public class ManagementServiceImpl implements ManagementService { } else { control = new BaseBroadcastGroupControlImpl(broadcastGroup, storageManager, configuration); } - registerInJMX(objectName, control); + registerInJMX(objectNameBuilder.getBroadcastGroupObjectName(configuration.getName()), control); registerInRegistry(ResourceNames.BROADCAST_GROUP + configuration.getName(), control); } @Override - public synchronized void unregisterBroadcastGroup(final String name) throws Exception { - ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(name); - unregisterFromJMX(objectName); + public void unregisterBroadcastGroup(final String name) throws Exception { + unregisterFromJMX(objectNameBuilder.getBroadcastGroupObjectName(name)); unregisterFromRegistry(ResourceNames.BROADCAST_GROUP + name); } @Override - public synchronized void registerBridge(final Bridge bridge) throws Exception { + public void registerBridge(final Bridge bridge) throws Exception { bridge.setNotificationService(this); - ObjectName objectName = objectNameBuilder.getBridgeObjectName(bridge.getConfiguration().getName()); BridgeControl control = new BridgeControlImpl(bridge, storageManager); - registerInJMX(objectName, control); + registerInJMX(objectNameBuilder.getBridgeObjectName(bridge.getConfiguration().getName()), control); registerInRegistry(ResourceNames.BRIDGE + bridge.getName(), control); } @Override - public synchronized void unregisterBridge(final String name) throws Exception { - ObjectName objectName = objectNameBuilder.getBridgeObjectName(name); - unregisterFromJMX(objectName); + public void unregisterBridge(final String name) throws Exception { + unregisterFromJMX(objectNameBuilder.getBridgeObjectName(name)); unregisterFromRegistry(ResourceNames.BRIDGE + name); } @Override - public synchronized void registerCluster(final ClusterConnection cluster, - final ClusterConnectionConfiguration configuration) throws Exception { - ObjectName objectName = objectNameBuilder.getClusterConnectionObjectName(configuration.getName()); + public void registerCluster(final ClusterConnection cluster, final ClusterConnectionConfiguration configuration) throws Exception { ClusterConnectionControl control = new ClusterConnectionControlImpl(cluster, storageManager, configuration); - registerInJMX(objectName, control); + registerInJMX(objectNameBuilder.getClusterConnectionObjectName(configuration.getName()), control); registerInRegistry(ResourceNames.CORE_CLUSTER_CONNECTION + configuration.getName(), control); } @Override - public synchronized void unregisterCluster(final String name) throws Exception { - ObjectName objectName = objectNameBuilder.getClusterConnectionObjectName(name); - unregisterFromJMX(objectName); + public void unregisterCluster(final String name) throws Exception { + unregisterFromJMX(objectNameBuilder.getClusterConnectionObjectName(name)); unregisterFromRegistry(ResourceNames.CORE_CLUSTER_CONNECTION + name); } @Override - public synchronized void registerConnectionRouter(final ConnectionRouter router) throws Exception { - ObjectName objectName = objectNameBuilder.getConnectionRouterObjectName(router.getName()); + public void registerConnectionRouter(final ConnectionRouter router) throws Exception { ConnectionRouterControl connectionRouterControl = new ConnectionRouterControlImpl(router, storageManager); - registerInJMX(objectName, connectionRouterControl); + registerInJMX(objectNameBuilder.getConnectionRouterObjectName(router.getName()), connectionRouterControl); registerInRegistry(ResourceNames.CONNECTION_ROUTER + router.getName(), connectionRouterControl); - - logger.debug("registered connection router {}", objectName); } @Override - public synchronized void unregisterConnectionRouter(final String name) throws Exception { - ObjectName objectName = objectNameBuilder.getConnectionRouterObjectName(name); - unregisterFromJMX(objectName); + public void unregisterConnectionRouter(final String name) throws Exception { + unregisterFromJMX(objectNameBuilder.getConnectionRouterObjectName(name)); unregisterFromRegistry(ResourceNames.CONNECTION_ROUTER + name); } @Override public void registerHawtioSecurity(GuardInvocationHandler guard) throws Exception { - ObjectName objectName = objectNameBuilder.getSecurityObjectName(); HawtioSecurityControl control = new HawtioSecurityControlImpl(guard, storageManager); - registerInJMX(objectName, control); + registerInJMX(objectNameBuilder.getSecurityObjectName(), control); registerInRegistry(ResourceNames.MANAGEMENT_SECURITY, control); } @Override public void unregisterHawtioSecurity() throws Exception { - ObjectName objectName = objectNameBuilder.getSecurityObjectName(); - unregisterFromJMX(objectName); + unregisterFromJMX(objectNameBuilder.getSecurityObjectName()); unregisterFromRegistry(ResourceNames.MANAGEMENT_SECURITY); } @@ -607,15 +557,14 @@ public class ManagementServiceImpl implements ManagementService { } @Override - public synchronized Object getResource(final String resourceName) { + public Object getResource(final String resourceName) { return registry.get(resourceName); } @Override - public synchronized Object[] getResources(final Class resourceType) { + public Object[] getResources(final Class resourceType) { List resources = new ArrayList<>(); - Collection clone = new ArrayList<>(registry.values()); - for (Object entry : clone) { + for (Object entry : new ArrayList<>(registry.values())) { if (resourceType.isAssignableFrom(entry.getClass())) { resources.add(entry); } @@ -623,8 +572,6 @@ public class ManagementServiceImpl implements ManagementService { return resources.toArray(new Object[resources.size()]); } - private final Set registeredNames = new HashSet<>(); - @Override public void registerInJMX(final ObjectName objectName, final Object managedResource) throws Exception { if (!jmxManagementEnabled) { @@ -633,28 +580,13 @@ public class ManagementServiceImpl implements ManagementService { synchronized (mbeanServer) { unregisterFromJMX(objectName); - mbeanServer.registerMBean(managedResource, objectName); - registeredNames.add(objectName); } + logger.debug("Registered in JMX: {} as {}", objectName, managedResource); } - @Override - public synchronized void registerInRegistry(final String resourceName, final Object managedResource) { - unregisterFromRegistry(resourceName); - - logger.debug("Registering {} as {}", resourceName, managedResource); - - registry.put(resourceName, managedResource); - } - - @Override - public synchronized void unregisterFromRegistry(final String resourceName) { - registry.remove(resourceName); - } - - // the JMX unregistration is synchronized to avoid race conditions if 2 clients tries to + // the JMX unregistration is synchronized to avoid race conditions if 2 clients try to // unregister the same resource (e.g. a queue) at the same time since unregisterMBean() // will throw an exception if the MBean has already been unregistered @Override @@ -666,10 +598,30 @@ public class ManagementServiceImpl implements ManagementService { synchronized (mbeanServer) { if (mbeanServer.isRegistered(objectName)) { mbeanServer.unregisterMBean(objectName); - registeredNames.remove(objectName); } } + logger.debug("Unregistered from JMX: {}", objectName); + } + + @Override + public void registerInRegistry(final String resourceName, final Object managedResource) { + Object replaced = registry.put(resourceName, managedResource); + String addendum = ""; + if (replaced != null) { + addendum = ". Replaced: " + replaced; + } + logger.debug("Registered in management: {} as {}{}", resourceName, managedResource, addendum); + } + + @Override + public void unregisterFromRegistry(final String resourceName) { + Object removed = registry.remove(resourceName); + if (removed != null) { + logger.debug("Unregistered from management: {} as {}", resourceName, removed); + } else { + logger.debug("Attempted to unregister {} from management, but it was not registered."); + } } @Override @@ -692,16 +644,16 @@ public class ManagementServiceImpl implements ManagementService { return managementNotificationAddress; } - // ActiveMQComponent implementation ----------------------------- - @Override - public void start() throws Exception { + public synchronized void start() throws Exception { + if (started) { + return; + } + if (messageCounterEnabled) { messageCounterManager.start(); } - started = true; - /** * Ensure the management notification address is created otherwise if auto-create-address = false then cluster * bridges won't be able to connect. @@ -719,10 +671,18 @@ public class ManagementServiceImpl implements ManagementService { } } }); + + started = true; } @Override public synchronized void stop() throws Exception { + if (!started) { + return; + } + + started = false; + Set resourceNames = new HashSet<>(registry.keySet()); for (String resourceName : resourceNames) { @@ -783,8 +743,6 @@ public class ManagementServiceImpl implements ManagementService { storageManager = null; registeredNames.clear(); - - started = false; } @Override @@ -794,60 +752,56 @@ public class ManagementServiceImpl implements ManagementService { @Override public void sendNotification(final Notification notification) throws Exception { - if (logger.isTraceEnabled()) { - logger.trace("Sending Notification = {}, notificationEnabled={} messagingServerControl={}", - notification, notificationsEnabled, messagingServerControl); + if (messagingServerControl == null || !notificationsEnabled) { + return; } - // This needs to be synchronized since we need to ensure notifications are processed in strict sequence - synchronized (this) { - if (messagingServerControl != null && notificationsEnabled) { - // We also need to synchronize on the post office notification lock - // otherwise we can get notifications arriving in wrong order / missing - // if a notification occurs at same time as sendQueueInfoToQueue is processed - synchronized (postOffice.getNotificationLock()) { - // First send to any local listeners - for (NotificationListener listener : listeners) { - try { - listener.onNotification(notification); - } catch (Exception e) { - // Exception thrown from one listener should not stop execution of others - ActiveMQServerLogger.LOGGER.errorCallingNotifListener(e); - } - } + // This needs to be synchronized since we need to ensure notifications are processed in strict sequence. + // Furthermore, this needs to synchronize on the PostOffice notificationLock otherwise we can get notifications + // arriving in the wrong order or missing notifications if one occurs at same time as sendQueueInfoToQueue is + // processed. + synchronized (postOffice.getNotificationLock()) { + logger.trace("Sending notification={}", notification); - // start sending notification *messages* only when server has initialised - // Note at backup initialisation we don't want to send notifications either - // https://jira.jboss.org/jira/browse/HORNETQ-317 - if (messagingServer == null || !messagingServer.isActive()) { - logger.debug("ignoring message {} as the server is not initialized", notification); - - return; - } - - long messageID = storageManager.generateID(); - - Message notificationMessage = new CoreMessage(messageID, 512); - - // Notification messages are always durable so the user can choose whether to add a durable queue to - // consume them in - notificationMessage.setDurable(true); - notificationMessage.setAddress(managementNotificationAddress); - - if (notification.getProperties() != null) { - TypedProperties props = notification.getProperties(); - props.forEach(notificationMessage::putObjectProperty); - } - - notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, SimpleString.of(notification.getType().toString())); - - long timestamp = System.currentTimeMillis(); - notificationMessage.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, timestamp); - notificationMessage.setTimestamp(timestamp); - - postOffice.route(notificationMessage, false); + // First send to any local listeners + for (NotificationListener listener : listeners) { + try { + listener.onNotification(notification); + } catch (Exception e) { + // Exception thrown from one listener should not stop execution of others + ActiveMQServerLogger.LOGGER.errorCallingNotifListener(e); } } + + // start sending notification *messages* only when server has initialised + // Note at backup initialisation we don't want to send notifications either + // https://jira.jboss.org/jira/browse/HORNETQ-317 + if (messagingServer == null || !messagingServer.isActive()) { + logger.debug("ignoring message {} as the server is not initialized", notification); + return; + } + + long messageID = storageManager.generateID(); + + Message notificationMessage = new CoreMessage(messageID, 512); + + // Notification messages are always durable so the user can choose whether to add a durable queue to + // consume them in + notificationMessage.setDurable(true); + notificationMessage.setAddress(managementNotificationAddress); + + if (notification.getProperties() != null) { + TypedProperties props = notification.getProperties(); + props.forEach(notificationMessage::putObjectProperty); + } + + notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, SimpleString.of(notification.getType().toString())); + + long timestamp = System.currentTimeMillis(); + notificationMessage.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, timestamp); + notificationMessage.setTimestamp(timestamp); + + postOffice.route(notificationMessage, false); } } @@ -865,7 +819,7 @@ public class ManagementServiceImpl implements ManagementService { throw ActiveMQMessageBundle.BUNDLE.cannotFindResource(resourceName); } - Method method = null; + Method method; String upperCaseAttribute = attribute.substring(0, 1).toUpperCase() + attribute.substring(1); try { @@ -899,7 +853,6 @@ public class ManagementServiceImpl implements ManagementService { throw ActiveMQMessageBundle.BUNDLE.cannotFindResource(resourceName); } - Method method = null; Method[] methods = resource.getClass().getMethods(); @@ -966,5 +919,4 @@ public class ManagementServiceImpl implements ManagementService { return correlationId; } - } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImplTest.java index f2db2b01c8..a0b5ece37f 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImplTest.java @@ -111,7 +111,7 @@ public class ManagementServiceImplTest { Mockito.when(queue.getRoutingType()).thenReturn(RoutingType.ANYCAST); StorageManager storageManager = Mockito.mock(StorageManager.class); - managementService.registerQueue(queue, new AddressInfo(queueName), storageManager); + managementService.registerQueue(queue, queueName, storageManager); managementService.getAttribute(ResourceNames.QUEUE + queueName, "ringSize", auth); expected = SimpleString.of("mm.queue." + queueName + ".getRingSize"); @@ -174,7 +174,7 @@ public class ManagementServiceImplTest { Mockito.when(queue.getRoutingType()).thenReturn(RoutingType.ANYCAST); StorageManager storageManager = Mockito.mock(StorageManager.class); - managementService.registerQueue(queue, new AddressInfo(queueName), storageManager); + managementService.registerQueue(queue, queueName, storageManager); managementService.invokeOperation(ResourceNames.QUEUE + queueName, "getRingSize", new Object[]{}, auth); expected = SimpleString.of("$mm.queue." + queueName + ".getRingSize");