From 4550fcf47c8c87d4e7cc0c06207a09c7de41dc65 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 15 Dec 2022 13:55:03 -0500 Subject: [PATCH] ARTEMIS-4116 Management lock to avoid multiple long running tasks running from user requests --- .../management/ActiveMQServerControl.java | 4 +- .../impl/ActiveMQServerControlImpl.java | 472 ++++++------ .../management/impl/AddressControlImpl.java | 130 ++-- .../management/impl/QueueControlImpl.java | 687 ++++++++++-------- .../core/server/impl/ActiveMQServerImpl.java | 11 +- .../core/server/replay/ReplayManager.java | 18 +- 6 files changed, 699 insertions(+), 623 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 248f44d4a6..2229abfee5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -526,7 +526,7 @@ public interface ActiveMQServerControl { // Operations ---------------------------------------------------- @Operation(desc = "Isolate the broker", impact = MBeanOperationInfo.ACTION) - boolean freezeReplication(); + boolean freezeReplication() throws Exception; @Operation(desc = "Create an address", impact = MBeanOperationInfo.ACTION) String createAddress(@Parameter(name = "name", desc = "The name of the address") String name, @@ -1905,7 +1905,7 @@ public interface ActiveMQServerControl { * Returns the names of the queues created on this server with the given routing-type. */ @Operation(desc = "Names of the queues created on this server with the given routing-type (i.e. ANYCAST or MULTICAST)", impact = MBeanOperationInfo.INFO) - String[] getQueueNames(@Parameter(name = "routingType", desc = "The routing type, MULTICAST or ANYCAST") String routingType); + String[] getQueueNames(@Parameter(name = "routingType", desc = "The routing type, MULTICAST or ANYCAST") String routingType) throws Exception; /** * Returns the names of the cluster-connections deployed on this server. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 8e5ce81e08..25deb59e09 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -826,17 +826,19 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } @Override - public boolean freezeReplication() { + public boolean freezeReplication() throws Exception { if (AuditLogger.isBaseLoggingEnabled()) { AuditLogger.freezeReplication(this.server); } - Activation activation = server.getActivation(); - if (activation instanceof SharedNothingLiveActivation) { - SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation; - liveActivation.freezeReplication(); - return true; + try (AutoCloseable lock = server.managementLock()) { + Activation activation = server.getActivation(); + if (activation instanceof SharedNothingLiveActivation) { + SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation; + liveActivation.freezeReplication(); + return true; + } + return false; } - return false; } private enum AddressInfoTextFormatter { @@ -966,21 +968,25 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active @Override public void deleteAddress(String name, boolean force) throws Exception { - checkStarted(); - clearIO(); - try { - server.removeAddressInfo(new SimpleString(name), null, force); - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.deleteAddressSuccess(name); + // delete might be a long running task, we ensure only one large task running + try (AutoCloseable lock = server.managementLock()) { + checkStarted(); + + clearIO(); + try { + server.removeAddressInfo(new SimpleString(name), null, force); + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.deleteAddressSuccess(name); + } + } catch (ActiveMQException e) { + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.deleteAddressFailure(name); + } + throw new IllegalStateException(e.getMessage()); + } finally { + blockOnIO(); } - } catch (ActiveMQException e) { - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.deleteAddressFailure(name); - } - throw new IllegalStateException(e.getMessage()); - } finally { - blockOnIO(); } } @@ -1003,10 +1009,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active clearIO(); try { - server.createQueue(new QueueConfiguration(name) - .setAddress(address) - .setFilterString(filterStr) - .setDurable(durable)); + server.createQueue(new QueueConfiguration(name).setAddress(address).setFilterString(filterStr).setDurable(durable)); } finally { blockOnIO(); } @@ -1232,10 +1235,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active boolean autoCreateAddress, long ringSize) throws Exception { if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.createQueue(this.server, null, null, address, routingType, name, filterStr, durable, - maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, - lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, - autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize); + AuditLogger.createQueue(this.server, null, null, address, routingType, name, filterStr, durable, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize); } checkStarted(); @@ -1247,34 +1247,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active filter = new SimpleString(filterStr); } - final Queue queue = server.createQueue(new QueueConfiguration(name) - .setAddress(address) - .setRoutingType(RoutingType.valueOf(routingType.toUpperCase())) - .setFilterString(filter) - .setDurable(durable) - .setMaxConsumers(maxConsumers) - .setPurgeOnNoConsumers(purgeOnNoConsumers) - .setExclusive(exclusive) - .setGroupRebalance(groupRebalance) - .setGroupBuckets(groupBuckets) - .setGroupFirstKey(groupFirstKey) - .setLastValue(lastValue) - .setLastValueKey(lastValueKey) - .setNonDestructive(nonDestructive) - .setConsumersBeforeDispatch(consumersBeforeDispatch) - .setDelayBeforeDispatch(delayBeforeDispatch) - .setAutoDelete(autoDelete) - .setAutoDeleteDelay(autoDeleteDelay) - .setAutoDeleteMessageCount(autoDeleteMessageCount) - .setAutoCreateAddress(autoCreateAddress) - .setRingSize(ringSize)); + final Queue queue = server.createQueue(new QueueConfiguration(name).setAddress(address).setRoutingType(RoutingType.valueOf(routingType.toUpperCase())).setFilterString(filter).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers).setExclusive(exclusive).setGroupRebalance(groupRebalance).setGroupBuckets(groupBuckets).setGroupFirstKey(groupFirstKey).setLastValue(lastValue).setLastValueKey(lastValueKey).setNonDestructive(nonDestructive).setConsumersBeforeDispatch(consumersBeforeDispatch).setDelayBeforeDispatch(delayBeforeDispatch).setAutoDelete(autoDelete).setAutoDeleteDelay(autoDeleteDelay).setAutoDeleteMessageCount(autoDeleteMessageCount).setAutoCreateAddress(autoCreateAddress).setRingSize(ringSize)); if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.createQueueSuccess( name, address, routingType); + AuditLogger.createQueueSuccess(name, address, routingType); } return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString(); } catch (ActiveMQException e) { if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.createQueueFailure( name, address, routingType); + AuditLogger.createQueueFailure(name, address, routingType); } throw new IllegalStateException(e.getMessage()); } finally { @@ -1313,7 +1293,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active @Override public String updateQueue(String queueConfigurationAsJson) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { AuditLogger.updateQueue(this.server, queueConfigurationAsJson); } @@ -1414,8 +1393,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active String user, Long ringSize) throws Exception { if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.updateQueue(this.server, name, routingType, filter, maxConsumers, purgeOnNoConsumers, - exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize); + AuditLogger.updateQueue(this.server, name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize); } checkStarted(); @@ -1592,27 +1570,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active @Override public void destroyQueue(final String name, final boolean removeConsumers, final boolean forceAutoDeleteAddress) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, forceAutoDeleteAddress); - } - checkStarted(); + // destroy might be a long running task, we prevent multiple running tasks in this case + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, forceAutoDeleteAddress); + } + checkStarted(); - clearIO(); - try { - SimpleString queueName = new SimpleString(name); + clearIO(); try { - server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, forceAutoDeleteAddress); - } catch (Exception e) { - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.destroyQueueFailure(name); + SimpleString queueName = new SimpleString(name); + try { + server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, forceAutoDeleteAddress); + } catch (Exception e) { + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.destroyQueueFailure(name); + } + throw e; } - throw e; + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.destroyQueueSuccess(name); + } + } finally { + blockOnIO(); } - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.destroyQueueSuccess(name); - } - } finally { - blockOnIO(); } } @@ -2149,56 +2130,64 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active @Override public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.commitPreparedTransaction(this.server, transactionAsBase64); - } - checkStarted(); - - clearIO(); - try { - List xids = resourceManager.getPreparedTransactions(); - - for (Xid xid : xids) { - if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) { - Transaction transaction = resourceManager.removeTransaction(xid, null); - transaction.commit(false); - long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true); - storageManager.waitOnOperations(); - resourceManager.putHeuristicCompletion(recordID, xid, true); - return true; - } + // commit might be a long running task if dealing with a large transaction + // ensuring a single one just in case + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.commitPreparedTransaction(this.server, transactionAsBase64); + } + checkStarted(); + + clearIO(); + try { + List xids = resourceManager.getPreparedTransactions(); + + for (Xid xid : xids) { + if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) { + Transaction transaction = resourceManager.removeTransaction(xid, null); + transaction.commit(false); + long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true); + storageManager.waitOnOperations(); + resourceManager.putHeuristicCompletion(recordID, xid, true); + return true; + } + } + return false; + } finally { + blockOnIO(); } - return false; - } finally { - blockOnIO(); } } @Override public synchronized boolean rollbackPreparedTransaction(final String transactionAsBase64) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.rollbackPreparedTransaction(this.server, transactionAsBase64); - } - checkStarted(); - - clearIO(); - try { - - List xids = resourceManager.getPreparedTransactions(); - - for (Xid xid : xids) { - if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) { - Transaction transaction = resourceManager.removeTransaction(xid, null); - transaction.rollback(); - long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false); - server.getStorageManager().waitOnOperations(); - resourceManager.putHeuristicCompletion(recordID, xid, false); - return true; - } + // rollback might be a long running task if dealing with a large transaction + // ensuring a single task just in case + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.rollbackPreparedTransaction(this.server, transactionAsBase64); + } + checkStarted(); + + clearIO(); + try { + + List xids = resourceManager.getPreparedTransactions(); + + for (Xid xid : xids) { + if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) { + Transaction transaction = resourceManager.removeTransaction(xid, null); + transaction.rollback(); + long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false); + server.getStorageManager().waitOnOperations(); + resourceManager.putHeuristicCompletion(recordID, xid, false); + return true; + } + } + return false; + } finally { + blockOnIO(); } - return false; - } finally { - blockOnIO(); } } @@ -2278,149 +2267,170 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active @Override public boolean closeConsumerConnectionsForAddress(final String address) { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.closeConsumerConnectionsForAddress(this.server, address); - } - boolean closed = false; - checkStarted(); + // this could be a long running task, ensuring a single task + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.closeConsumerConnectionsForAddress(this.server, address); + } + boolean closed = false; + checkStarted(); - clearIO(); - try { - for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) { - if (binding instanceof LocalQueueBinding) { - Queue queue = ((LocalQueueBinding) binding).getQueue(); - for (Consumer consumer : queue.getConsumers()) { - if (consumer instanceof ServerConsumer) { - ServerConsumer serverConsumer = (ServerConsumer) consumer; - RemotingConnection connection = null; + clearIO(); + try { + for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) { + if (binding instanceof LocalQueueBinding) { + Queue queue = ((LocalQueueBinding) binding).getQueue(); + for (Consumer consumer : queue.getConsumers()) { + if (consumer instanceof ServerConsumer) { + ServerConsumer serverConsumer = (ServerConsumer) consumer; + RemotingConnection connection = null; - for (RemotingConnection potentialConnection : remotingService.getConnections()) { - if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) { - connection = potentialConnection; + for (RemotingConnection potentialConnection : remotingService.getConnections()) { + if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) { + connection = potentialConnection; + } } - } - if (connection != null) { - remotingService.removeConnection(connection.getID()); - connection.fail(ActiveMQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address)); - closed = true; + if (connection != null) { + remotingService.removeConnection(connection.getID()); + connection.fail(ActiveMQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address)); + closed = true; + } } } } } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, e); + } finally { + blockOnIO(); } - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, e); - } finally { - blockOnIO(); + return closed; + } catch (Throwable e) { + throw new RuntimeException(e.getMessage(), e); } - return closed; } @Override public boolean closeConnectionsForUser(final String userName) { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.closeConnectionsForUser(this.server, userName); - } - boolean closed = false; - checkStarted(); + // possibly a long running task + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.closeConnectionsForUser(this.server, userName); + } + boolean closed = false; + checkStarted(); - clearIO(); - try { - for (ServerSession serverSession : server.getSessions()) { - if (serverSession.getUsername() != null && serverSession.getUsername().equals(userName)) { - RemotingConnection connection = null; + clearIO(); + try { + for (ServerSession serverSession : server.getSessions()) { + if (serverSession.getUsername() != null && serverSession.getUsername().equals(userName)) { + RemotingConnection connection = null; - for (RemotingConnection potentialConnection : remotingService.getConnections()) { - if (potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString())) { - connection = potentialConnection; + for (RemotingConnection potentialConnection : remotingService.getConnections()) { + if (potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString())) { + connection = potentialConnection; + } + } + + if (connection != null) { + remotingService.removeConnection(connection.getID()); + connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName)); + closed = true; } } - - if (connection != null) { - remotingService.removeConnection(connection.getID()); - connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName)); - closed = true; - } } + } finally { + blockOnIO(); } - } finally { - blockOnIO(); + return closed; + } catch (Throwable e) { + throw new RuntimeException(e.getMessage(), e); } - return closed; } @Override public boolean closeConnectionWithID(final String ID) { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.closeConnectionWithID(this.server, ID); - } - checkStarted(); - - clearIO(); - try { - for (RemotingConnection connection : remotingService.getConnections()) { - if (connection.getID().toString().equals(ID)) { - remotingService.removeConnection(connection.getID()); - connection.fail(ActiveMQMessageBundle.BUNDLE.connectionWithIDClosedByManagement(ID)); - return true; - } + // possibly a long running task + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.closeConnectionWithID(this.server, ID); } - } finally { - blockOnIO(); + checkStarted(); + + clearIO(); + try { + for (RemotingConnection connection : remotingService.getConnections()) { + if (connection.getID().toString().equals(ID)) { + remotingService.removeConnection(connection.getID()); + connection.fail(ActiveMQMessageBundle.BUNDLE.connectionWithIDClosedByManagement(ID)); + return true; + } + } + } finally { + blockOnIO(); + } + return false; + } catch (Throwable e) { + throw new RuntimeException(e.getMessage(), e); } - return false; } @Override public boolean closeSessionWithID(final String connectionID, final String ID) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.closeSessionWithID(this.server, connectionID, ID); - } - checkStarted(); - - clearIO(); - try { - List sessions = server.getSessions(connectionID); - for (ServerSession session : sessions) { - if (session.getName().equals(ID.toString())) { - session.close(true); - return true; - } + // possibly a long running task + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.closeSessionWithID(this.server, connectionID, ID); } + checkStarted(); - } finally { - blockOnIO(); + clearIO(); + try { + List sessions = server.getSessions(connectionID); + for (ServerSession session : sessions) { + if (session.getName().equals(ID.toString())) { + session.close(true); + return true; + } + } + + } finally { + blockOnIO(); + } + return false; } - return false; } @Override public boolean closeConsumerWithID(final String sessionID, final String ID) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.closeConsumerWithID(this.server, sessionID, ID); - } - checkStarted(); + // possibly a long running task + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.closeConsumerWithID(this.server, sessionID, ID); + } + checkStarted(); - clearIO(); - try { - Set sessions = server.getSessions(); - for (ServerSession session : sessions) { - if (session.getName().equals(sessionID.toString())) { - Set serverConsumers = session.getServerConsumers(); - for (ServerConsumer serverConsumer : serverConsumers) { - if (serverConsumer.sequentialID() == Long.valueOf(ID)) { - serverConsumer.disconnect(); - return true; + clearIO(); + try { + Set sessions = server.getSessions(); + for (ServerSession session : sessions) { + if (session.getName().equals(sessionID.toString())) { + Set serverConsumers = session.getServerConsumers(); + for (ServerConsumer serverConsumer : serverConsumers) { + if (serverConsumer.sequentialID() == Long.valueOf(ID)) { + serverConsumer.disconnect(); + return true; + } } } } - } - } finally { - blockOnIO(); + } finally { + blockOnIO(); + } + return false; } - return false; } @Override @@ -4116,27 +4126,29 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active @Override public void scaleDown(String connector) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.scaleDown(this.server, connector); - } - checkStarted(); - - clearIO(); - HAPolicy haPolicy = server.getHAPolicy(); - if (haPolicy instanceof LiveOnlyPolicy) { - LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy; - - if (liveOnlyPolicy.getScaleDownPolicy() == null) { - liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy()); + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.scaleDown(this.server, connector); } + checkStarted(); - liveOnlyPolicy.getScaleDownPolicy().setEnabled(true); + clearIO(); + HAPolicy haPolicy = server.getHAPolicy(); + if (haPolicy instanceof LiveOnlyPolicy) { + LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy; - if (connector != null) { - liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector); + if (liveOnlyPolicy.getScaleDownPolicy() == null) { + liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy()); + } + + liveOnlyPolicy.getScaleDownPolicy().setEnabled(true); + + if (connector != null) { + liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector); + } + + server.fail(true); } - - server.fail(true); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index d88051e392..9044936d63 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -431,10 +431,15 @@ public class AddressControlImpl extends AbstractControl implements AddressContro @Override public long getMessageCount() { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.getMessageCount(this.addressInfo); + // prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getMessageCount(this.addressInfo); + } + return getMessageCount(DurabilityType.ALL); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); } - return getMessageCount(DurabilityType.ALL); } @Override @@ -472,14 +477,17 @@ public class AddressControlImpl extends AbstractControl implements AddressContro final String user, final String password, boolean createMessageId) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****"); - } - try { - return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId); - } catch (Exception e) { - e.printStackTrace(); - throw new IllegalStateException(e.getMessage()); + // prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****"); + } + try { + return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId); + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalStateException(e.getMessage()); + } } } @@ -585,20 +593,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro @Override public boolean clearDuplicateIdCache() { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.clearDuplicateIdCache(this.addressInfo); - } - DuplicateIDCache cache = ((PostOfficeImpl)server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName()); - try { - if (cache != null) { - cache.clear(); - return true; + // prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.clearDuplicateIdCache(this.addressInfo); + } + DuplicateIDCache cache = ((PostOfficeImpl) server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName()); + try { + if (cache != null) { + cache.clear(); + return true; + } + } catch (Exception e) { + logger.debug("Failed to clear duplicate ID cache", e); } - } catch (Exception e) { - logger.debug("Failed to clear duplicate ID cache", e); - } - return false; + return false; + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } } @Override @@ -627,37 +640,41 @@ public class AddressControlImpl extends AbstractControl implements AddressContro @Override public long purge() throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.purge(this.addressInfo); - } - clearIO(); - long totalMsgs = 0; - try { - Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName()); - if (bindings != null) { - for (Binding binding : bindings.getBindings()) { - if (binding instanceof QueueBinding) { - totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED); + // prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.purge(this.addressInfo); + } + clearIO(); + long totalMsgs = 0; + try { + Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName()); + if (bindings != null) { + for (Binding binding : bindings.getBindings()) { + if (binding instanceof QueueBinding) { + totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED); + } } } + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.purgeAddressSuccess(addressInfo.getName().toString()); + } + } catch (Throwable t) { + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.purgeAddressFailure(addressInfo.getName().toString()); + } + throw new IllegalStateException(t.getMessage()); + } finally { + blockOnIO(); } - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.purgeAddressSuccess(addressInfo.getName().toString()); - } - } catch (Throwable t) { - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.purgeAddressFailure(addressInfo.getName().toString()); - } - throw new IllegalStateException(t.getMessage()); - } finally { - blockOnIO(); - } - return totalMsgs; + return totalMsgs; + } } @Override public void replay(String target, String filter) throws Exception { + // server.replay is already calling the managementLock, no need to do it here server.replay(null, null, this.getAddress(), target, filter); } @@ -669,22 +686,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro Date startScanDate = format.parse(startScan); Date endScanDate = format.parse(endScan); + // server.replay is already calling the managementLock, no need to do it here server.replay(startScanDate, endScanDate, this.getAddress(), target, filter); } private long getMessageCount(final DurabilityType durability) { - long count = 0; - for (String queueName : getQueueNames()) { - Queue queue = server.locateQueue(queueName); - if (queue != null && - (durability == DurabilityType.ALL || - (durability == DurabilityType.DURABLE && queue.isDurable()) || - (durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) { - count += queue.getMessageCount(); + // prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + long count = 0; + for (String queueName : getQueueNames()) { + Queue queue = server.locateQueue(queueName); + if (queue != null && (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && queue.isDurable()) || (durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) { + count += queue.getMessageCount(); + } } + return count; + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); } - return count; } private void checkStarted() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 1fbefc3847..9bced82671 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -740,32 +740,38 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { @Override public Map[] listScheduledMessages() throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.listScheduledMessages(queue); - } - checkStarted(); + // it could be a long running task + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.listScheduledMessages(queue); + } + checkStarted(); - clearIO(); - try { - List refs = queue.getScheduledMessages(); - return convertMessagesToMaps(refs); - } finally { - blockOnIO(); + clearIO(); + try { + List refs = queue.getScheduledMessages(); + return convertMessagesToMaps(refs); + } finally { + blockOnIO(); + } } } @Override public String listScheduledMessagesAsJSON() throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.listScheduledMessagesAsJSON(queue); - } - checkStarted(); + // it could be a long running task + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.listScheduledMessagesAsJSON(queue); + } + checkStarted(); - clearIO(); - try { - return QueueControlImpl.toJSON(listScheduledMessages()); - } finally { - blockOnIO(); + clearIO(); + try { + return QueueControlImpl.toJSON(listScheduledMessages()); + } finally { + blockOnIO(); + } } } @@ -969,33 +975,36 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } private Map internalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception { - checkStarted(); + // long running task + try (AutoCloseable lock = server.managementLock()) { + checkStarted(); - clearIO(); + clearIO(); - Map result = new HashMap<>(); - try { - Filter filter = FilterImpl.createFilter(filterStr); - SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr); - if (filter == null && groupByProperty == null) { - result.put(null, getMessageCount()); - } else { - final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize(); - int count = 0; - try (LinkedListIterator iterator = queue.browserIterator()) { - try { - while (iterator.hasNext() && count++ < limit) { - Message message = iterator.next().getMessage(); - internalComputeMessage(result, filter, groupByProperty, message); + Map result = new HashMap<>(); + try { + Filter filter = FilterImpl.createFilter(filterStr); + SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr); + if (filter == null && groupByProperty == null) { + result.put(null, getMessageCount()); + } else { + final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize(); + int count = 0; + try (LinkedListIterator iterator = queue.browserIterator()) { + try { + while (iterator.hasNext() && count++ < limit) { + Message message = iterator.next().getMessage(); + internalComputeMessage(result, filter, groupByProperty, message); + } + } catch (NoSuchElementException ignored) { + // this could happen through paging browsing } - } catch (NoSuchElementException ignored) { - // this could happen through paging browsing } } + return result; + } finally { + blockOnIO(); } - return result; - } finally { - blockOnIO(); } } @@ -1019,26 +1028,26 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } private Map internalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception { - checkStarted(); + // it could be a long running task + try (AutoCloseable lock = server.managementLock()) { + checkStarted(); - clearIO(); + clearIO(); - Map result = new HashMap<>(); - try { - Filter filter = FilterImpl.createFilter(filterStr); - SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr); - if (filter == null && groupByProperty == null) { - result.put(null, Long.valueOf(getDeliveringCount())); - } else { - Map> deliveringMessages = queue.getDeliveringMessages(); - deliveringMessages.forEach((s, messageReferenceList) -> - messageReferenceList.forEach(messageReference -> - internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage()) - )); + Map result = new HashMap<>(); + try { + Filter filter = FilterImpl.createFilter(filterStr); + SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr); + if (filter == null && groupByProperty == null) { + result.put(null, Long.valueOf(getDeliveringCount())); + } else { + Map> deliveringMessages = queue.getDeliveringMessages(); + deliveringMessages.forEach((s, messageReferenceList) -> messageReferenceList.forEach(messageReference -> internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage()))); + } + return result; + } finally { + blockOnIO(); } - return result; - } finally { - blockOnIO(); } } @@ -1057,18 +1066,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { @Override public boolean removeMessage(final long messageID) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.removeMessage(queue, messageID); - } - checkStarted(); + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.removeMessage(queue, messageID); + } + checkStarted(); - clearIO(); - try { - return queue.deleteReference(messageID); - } catch (ActiveMQException e) { - throw new IllegalStateException(e.getMessage()); - } finally { - blockOnIO(); + clearIO(); + try { + return queue.deleteReference(messageID); + } catch (ActiveMQException e) { + throw new IllegalStateException(e.getMessage()); + } finally { + blockOnIO(); + } } } @@ -1079,30 +1091,33 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { @Override public int removeMessages(final int flushLimit, final String filterStr) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.removeMessages(queue, flushLimit, filterStr); - } - checkStarted(); - - clearIO(); - try { - Filter filter = FilterImpl.createFilter(filterStr); - - int removed = 0; - try { - removed = queue.deleteMatchingReferences(flushLimit, filter); - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.removeMessagesSuccess(removed, queue.getName().toString()); - } - } catch (Exception e) { - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.removeMessagesFailure(queue.getName().toString()); - } - throw e; + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.removeMessages(queue, flushLimit, filterStr); + } + checkStarted(); + + clearIO(); + try { + Filter filter = FilterImpl.createFilter(filterStr); + + int removed = 0; + try { + removed = queue.deleteMatchingReferences(flushLimit, filter); + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.removeMessagesSuccess(removed, queue.getName().toString()); + } + } catch (Exception e) { + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.removeMessagesFailure(queue.getName().toString()); + } + throw e; + } + return removed; + } finally { + blockOnIO(); } - return removed; - } finally { - blockOnIO(); } } @@ -1113,87 +1128,99 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { @Override public boolean expireMessage(final long messageID) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.expireMessage(queue, messageID); - } - checkStarted(); + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.expireMessage(queue, messageID); + } + checkStarted(); - clearIO(); - try { - return queue.expireReference(messageID); - } finally { - blockOnIO(); + clearIO(); + try { + return queue.expireReference(messageID); + } finally { + blockOnIO(); + } } } @Override public int expireMessages(final String filterStr) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.expireMessages(queue, filterStr); - } - checkStarted(); + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.expireMessages(queue, filterStr); + } + checkStarted(); - clearIO(); - try { - Filter filter = FilterImpl.createFilter(filterStr); - return queue.expireReferences(filter); - } catch (ActiveMQException e) { - throw new IllegalStateException(e.getMessage()); - } finally { - blockOnIO(); + clearIO(); + try { + Filter filter = FilterImpl.createFilter(filterStr); + return queue.expireReferences(filter); + } catch (ActiveMQException e) { + throw new IllegalStateException(e.getMessage()); + } finally { + blockOnIO(); + } } } @Override public boolean retryMessage(final long messageID) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.retryMessage(queue, messageID); - } + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.retryMessage(queue, messageID); + } - checkStarted(); - clearIO(); + checkStarted(); + clearIO(); - try { - Filter singleMessageFilter = new Filter() { - @Override - public boolean match(Message message) { - return message.getMessageID() == messageID; - } + try { + Filter singleMessageFilter = new Filter() { + @Override + public boolean match(Message message) { + return message.getMessageID() == messageID; + } - @Override - public boolean match(Map map) { - return false; - } + @Override + public boolean match(Map map) { + return false; + } - @Override - public boolean match(Filterable filterable) { - return false; - } + @Override + public boolean match(Filterable filterable) { + return false; + } - @Override - public SimpleString getFilterString() { - return new SimpleString("custom filter for MESSAGEID= messageID"); - } - }; + @Override + public SimpleString getFilterString() { + return new SimpleString("custom filter for MESSAGEID= messageID"); + } + }; - return queue.retryMessages(singleMessageFilter) > 0; - } finally { - blockOnIO(); + return queue.retryMessages(singleMessageFilter) > 0; + } finally { + blockOnIO(); + } } } @Override public int retryMessages() throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.retryMessages(queue); - } - checkStarted(); - clearIO(); + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.retryMessages(queue); + } + checkStarted(); + clearIO(); - try { - return queue.retryMessages(null); - } finally { - blockOnIO(); + try { + return queue.retryMessages(null); + } finally { + blockOnIO(); + } } } @@ -1206,22 +1233,25 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { public boolean moveMessage(final long messageID, final String otherQueueName, final boolean rejectDuplicates) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.moveMessage(queue, messageID, otherQueueName, rejectDuplicates); - } - checkStarted(); - - clearIO(); - try { - Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName)); - - if (binding == null) { - throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName); + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.moveMessage(queue, messageID, otherQueueName, rejectDuplicates); } + checkStarted(); - return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates); - } finally { - blockOnIO(); + clearIO(); + try { + Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName)); + + if (binding == null) { + throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName); + } + + return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates); + } finally { + blockOnIO(); + } } } @@ -1245,25 +1275,28 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { final String otherQueueName, final boolean rejectDuplicates, final int messageCount) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount); - } - checkStarted(); - - clearIO(); - try { - Filter filter = FilterImpl.createFilter(filterStr); - - Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName)); - - if (binding == null) { - throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName); + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount); } + checkStarted(); - int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding); - return retValue; - } finally { - blockOnIO(); + clearIO(); + try { + Filter filter = FilterImpl.createFilter(filterStr); + + Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName)); + + if (binding == null) { + throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName); + } + + int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding); + return retValue; + } finally { + blockOnIO(); + } } } @@ -1277,18 +1310,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { @Override public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.sendMessagesToDeadLetterAddress(queue, filterStr); - } - checkStarted(); + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.sendMessagesToDeadLetterAddress(queue, filterStr); + } + checkStarted(); - clearIO(); - try { - Filter filter = FilterImpl.createFilter(filterStr); + clearIO(); + try { + Filter filter = FilterImpl.createFilter(filterStr); - return queue.sendMessagesToDeadLetterAddress(filter); - } finally { - blockOnIO(); + return queue.sendMessagesToDeadLetterAddress(filter); + } finally { + blockOnIO(); + } } } @@ -1310,35 +1346,41 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { final String user, final String password, boolean createMessageId) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.sendMessageThroughManagement(queue, headers, type, body, durable, user, "****"); - } - try { - String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, createMessageId, queue.getID()); - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.sendMessageSuccess(queue.getName().toString(), user); + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.sendMessageThroughManagement(queue, headers, type, body, durable, user, "****"); } - return s; - } catch (Exception e) { - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.sendMessageFailure(queue.getName().toString(), user); + try { + String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, createMessageId, queue.getID()); + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.sendMessageSuccess(queue.getName().toString(), user); + } + return s; + } catch (Exception e) { + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.sendMessageFailure(queue.getName().toString(), user); + } + throw new IllegalStateException(e.getMessage()); } - throw new IllegalStateException(e.getMessage()); } } @Override public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.sendMessageToDeadLetterAddress(queue, messageID); - } - checkStarted(); + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.sendMessageToDeadLetterAddress(queue, messageID); + } + checkStarted(); - clearIO(); - try { - return queue.sendMessageToDeadLetterAddress(messageID); - } finally { - blockOnIO(); + clearIO(); + try { + return queue.sendMessageToDeadLetterAddress(messageID); + } finally { + blockOnIO(); + } } } @@ -1554,52 +1596,55 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { @Override public CompositeData[] browse(int page, int pageSize, String filter) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.browse(queue, page, pageSize); - } - checkStarted(); + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.browse(queue, page, pageSize); + } + checkStarted(); - clearIO(); - try { - long index = 0; - long start = (long) (page - 1) * pageSize; - long end = Math.min(page * pageSize, queue.getMessageCount()); + clearIO(); + try { + long index = 0; + long start = (long) (page - 1) * pageSize; + long end = Math.min(page * pageSize, queue.getMessageCount()); - ArrayList c = new ArrayList<>(); - Filter thefilter = FilterImpl.createFilter(filter); + ArrayList c = new ArrayList<>(); + Filter thefilter = FilterImpl.createFilter(filter); - final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit(); - try (LinkedListIterator iterator = queue.browserIterator()) { - try { - while (iterator.hasNext() && index < end) { - MessageReference ref = iterator.next(); - if (thefilter == null || thefilter.match(ref.getMessage())) { - if (index >= start) { - c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount())); + final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit(); + try (LinkedListIterator iterator = queue.browserIterator()) { + try { + while (iterator.hasNext() && index < end) { + MessageReference ref = iterator.next(); + if (thefilter == null || thefilter.match(ref.getMessage())) { + if (index >= start) { + c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount())); + } + //we only increase the index if we add a message, otherwise we could stop before we get to a filtered message + index++; } - //we only increase the index if we add a message, otherwise we could stop before we get to a filtered message - index++; } + } catch (NoSuchElementException ignored) { + // this could happen through paging browsing } - } catch (NoSuchElementException ignored) { - // this could happen through paging browsing - } - CompositeData[] rc = new CompositeData[c.size()]; - c.toArray(rc); - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.browseMessagesSuccess(queue.getName().toString(), c.size()); + CompositeData[] rc = new CompositeData[c.size()]; + c.toArray(rc); + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.browseMessagesSuccess(queue.getName().toString(), c.size()); + } + return rc; } - return rc; + } catch (Exception e) { + logger.warn(e.getMessage(), e); + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.browseMessagesFailure(queue.getName().toString()); + } + throw new IllegalStateException(e.getMessage()); + } finally { + blockOnIO(); } - } catch (Exception e) { - logger.warn(e.getMessage(), e); - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.browseMessagesFailure(queue.getName().toString()); - } - throw new IllegalStateException(e.getMessage()); - } finally { - blockOnIO(); } } @@ -1609,46 +1654,49 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } @Override public CompositeData[] browse(String filter) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.browse(queue, filter); - } - checkStarted(); + // this is a critical task, we need to prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.browse(queue, filter); + } + checkStarted(); - clearIO(); - try { - final AddressSettings addressSettings = addressSettingsRepository.getMatch(address); - final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit(); - final int limit = addressSettings.getManagementBrowsePageSize(); - int currentPageSize = 0; - ArrayList c = new ArrayList<>(); - Filter thefilter = FilterImpl.createFilter(filter); - try (LinkedListIterator iterator = queue.browserIterator()) { - try { - while (iterator.hasNext() && currentPageSize++ < limit) { - MessageReference ref = iterator.next(); - if (thefilter == null || thefilter.match(ref.getMessage())) { - c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount())); + clearIO(); + try { + final AddressSettings addressSettings = addressSettingsRepository.getMatch(address); + final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit(); + final int limit = addressSettings.getManagementBrowsePageSize(); + int currentPageSize = 0; + ArrayList c = new ArrayList<>(); + Filter thefilter = FilterImpl.createFilter(filter); + try (LinkedListIterator iterator = queue.browserIterator()) { + try { + while (iterator.hasNext() && currentPageSize++ < limit) { + MessageReference ref = iterator.next(); + if (thefilter == null || thefilter.match(ref.getMessage())) { + c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount())); + } } + } catch (NoSuchElementException ignored) { + // this could happen through paging browsing } - } catch (NoSuchElementException ignored) { - // this could happen through paging browsing - } - CompositeData[] rc = new CompositeData[c.size()]; - c.toArray(rc); - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.browseMessagesSuccess(queue.getName().toString(), currentPageSize); + CompositeData[] rc = new CompositeData[c.size()]; + c.toArray(rc); + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.browseMessagesSuccess(queue.getName().toString(), currentPageSize); + } + return rc; } - return rc; + } catch (ActiveMQException e) { + if (AuditLogger.isResourceLoggingEnabled()) { + AuditLogger.browseMessagesFailure(queue.getName().toString()); + } + throw new IllegalStateException(e.getMessage()); + } finally { + blockOnIO(); } - } catch (ActiveMQException e) { - if (AuditLogger.isResourceLoggingEnabled()) { - AuditLogger.browseMessagesFailure(queue.getName().toString()); - } - throw new IllegalStateException(e.getMessage()); - } finally { - blockOnIO(); } } @@ -1714,32 +1762,35 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { @Override public String listGroupsAsJSON() throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.listGroupsAsJSON(queue); - } - checkStarted(); + // prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.listGroupsAsJSON(queue); + } + checkStarted(); - clearIO(); - try { - Map groups = queue.getGroups(); + clearIO(); + try { + Map groups = queue.getGroups(); - JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder(); + JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder(); - for (Map.Entry group : groups.entrySet()) { + for (Map.Entry group : groups.entrySet()) { - if (group.getValue() instanceof ServerConsumer) { - ServerConsumer serverConsumer = (ServerConsumer) group.getValue(); + if (group.getValue() instanceof ServerConsumer) { + ServerConsumer serverConsumer = (ServerConsumer) group.getValue(); - JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime()); + JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime()); + + jsonArray.add(obj); + } - jsonArray.add(obj); } + return jsonArray.build().toString(); + } finally { + blockOnIO(); } - - return jsonArray.build().toString(); - } finally { - blockOnIO(); } } @@ -1969,31 +2020,37 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { @Override public void deliverScheduledMessages(String filter) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.deliverScheduledMessage(queue, filter); - } - checkStarted(); + // prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.deliverScheduledMessage(queue, filter); + } + checkStarted(); - clearIO(); - try { - queue.deliverScheduledMessages(filter); - } finally { - blockOnIO(); + clearIO(); + try { + queue.deliverScheduledMessages(filter); + } finally { + blockOnIO(); + } } } @Override public void deliverScheduledMessage(long messageId) throws Exception { - if (AuditLogger.isBaseLoggingEnabled()) { - AuditLogger.deliverScheduledMessage(queue, messageId); - } - checkStarted(); + // prevent parallel tasks running + try (AutoCloseable lock = server.managementLock()) { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.deliverScheduledMessage(queue, messageId); + } + checkStarted(); - clearIO(); - try { - queue.deliverScheduledMessage(messageId); - } finally { - blockOnIO(); + clearIO(); + try { + queue.deliverScheduledMessage(messageId); + } finally { + blockOnIO(); + } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index bcfab14924..9e122d670e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -50,6 +50,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -277,7 +278,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { /** Certain management operations shouldn't use more than one thread. * this semaphore is used to guarantee a single thread used. */ - private final Semaphore managementSemaphore = new Semaphore(1); + private final ReentrantLock managementLock = new ReentrantLock(); /** * This is a thread pool for io tasks only. @@ -522,7 +523,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (replayManager == null) { throw ActiveMQMessageBundle.BUNDLE.noRetention(); } - replayManager.replay(start, end, address, target, filter); + try (AutoCloseable lock = managementLock()) { + replayManager.replay(start, end, address, target, filter); + } } /** @@ -4626,10 +4629,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public AutoCloseable managementLock() throws Exception { - if (!managementSemaphore.tryAcquire(1, TimeUnit.MINUTES)) { + if (!managementLock.tryLock(1, TimeUnit.MINUTES)) { throw ActiveMQMessageBundle.BUNDLE.managementBusy(); } else { - return managementSemaphore::release; + return managementLock::unlock; } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java index bb46c337ea..a425b83b6f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/replay/ReplayManager.java @@ -69,23 +69,7 @@ public class ReplayManager { this.retentionFolder = server.getConfiguration().getJournalRetentionLocation(); } - public void replay(Date start, Date end, String sourceAddress, final String targetAddress, String filter) throws Exception { - - if (running.compareAndSet(false, true)) { - try { - actualReplay(start, end, sourceAddress, targetAddress, filter); - } catch (Exception e) { - logger.warn(e.getMessage()); - throw e; - } finally { - running.set(false); - } - } else { - throw new RuntimeException("Replay manager is currently busy with another operation"); - } - } - - private void actualReplay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception { + public void replay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception { logger.debug("Replay::{}", sourceAddress); if (sourceAddress == null) {