ARTEMIS-4116 Management lock to avoid multiple long running tasks running from user requests

This commit is contained in:
Clebert Suconic 2022-12-15 13:55:03 -05:00 committed by clebertsuconic
parent b565a8a7b9
commit 4550fcf47c
6 changed files with 699 additions and 623 deletions

View File

@ -526,7 +526,7 @@ public interface ActiveMQServerControl {
// Operations ----------------------------------------------------
@Operation(desc = "Isolate the broker", impact = MBeanOperationInfo.ACTION)
boolean freezeReplication();
boolean freezeReplication() throws Exception;
@Operation(desc = "Create an address", impact = MBeanOperationInfo.ACTION)
String createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@ -1905,7 +1905,7 @@ public interface ActiveMQServerControl {
* Returns the names of the queues created on this server with the given routing-type.
*/
@Operation(desc = "Names of the queues created on this server with the given routing-type (i.e. ANYCAST or MULTICAST)", impact = MBeanOperationInfo.INFO)
String[] getQueueNames(@Parameter(name = "routingType", desc = "The routing type, MULTICAST or ANYCAST") String routingType);
String[] getQueueNames(@Parameter(name = "routingType", desc = "The routing type, MULTICAST or ANYCAST") String routingType) throws Exception;
/**
* Returns the names of the cluster-connections deployed on this server.

View File

@ -826,17 +826,19 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
@Override
public boolean freezeReplication() {
public boolean freezeReplication() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.freezeReplication(this.server);
}
Activation activation = server.getActivation();
if (activation instanceof SharedNothingLiveActivation) {
SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation;
liveActivation.freezeReplication();
return true;
try (AutoCloseable lock = server.managementLock()) {
Activation activation = server.getActivation();
if (activation instanceof SharedNothingLiveActivation) {
SharedNothingLiveActivation liveActivation = (SharedNothingLiveActivation) activation;
liveActivation.freezeReplication();
return true;
}
return false;
}
return false;
}
private enum AddressInfoTextFormatter {
@ -966,21 +968,25 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public void deleteAddress(String name, boolean force) throws Exception {
checkStarted();
clearIO();
try {
server.removeAddressInfo(new SimpleString(name), null, force);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.deleteAddressSuccess(name);
// delete might be a long running task, we ensure only one large task running
try (AutoCloseable lock = server.managementLock()) {
checkStarted();
clearIO();
try {
server.removeAddressInfo(new SimpleString(name), null, force);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.deleteAddressSuccess(name);
}
} catch (ActiveMQException e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.deleteAddressFailure(name);
}
throw new IllegalStateException(e.getMessage());
} finally {
blockOnIO();
}
} catch (ActiveMQException e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.deleteAddressFailure(name);
}
throw new IllegalStateException(e.getMessage());
} finally {
blockOnIO();
}
}
@ -1003,10 +1009,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
server.createQueue(new QueueConfiguration(name)
.setAddress(address)
.setFilterString(filterStr)
.setDurable(durable));
server.createQueue(new QueueConfiguration(name).setAddress(address).setFilterString(filterStr).setDurable(durable));
} finally {
blockOnIO();
}
@ -1232,10 +1235,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
boolean autoCreateAddress,
long ringSize) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.createQueue(this.server, null, null, address, routingType, name, filterStr, durable,
maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey,
lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch,
autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
AuditLogger.createQueue(this.server, null, null, address, routingType, name, filterStr, durable, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
}
checkStarted();
@ -1247,34 +1247,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
filter = new SimpleString(filterStr);
}
final Queue queue = server.createQueue(new QueueConfiguration(name)
.setAddress(address)
.setRoutingType(RoutingType.valueOf(routingType.toUpperCase()))
.setFilterString(filter)
.setDurable(durable)
.setMaxConsumers(maxConsumers)
.setPurgeOnNoConsumers(purgeOnNoConsumers)
.setExclusive(exclusive)
.setGroupRebalance(groupRebalance)
.setGroupBuckets(groupBuckets)
.setGroupFirstKey(groupFirstKey)
.setLastValue(lastValue)
.setLastValueKey(lastValueKey)
.setNonDestructive(nonDestructive)
.setConsumersBeforeDispatch(consumersBeforeDispatch)
.setDelayBeforeDispatch(delayBeforeDispatch)
.setAutoDelete(autoDelete)
.setAutoDeleteDelay(autoDeleteDelay)
.setAutoDeleteMessageCount(autoDeleteMessageCount)
.setAutoCreateAddress(autoCreateAddress)
.setRingSize(ringSize));
final Queue queue = server.createQueue(new QueueConfiguration(name).setAddress(address).setRoutingType(RoutingType.valueOf(routingType.toUpperCase())).setFilterString(filter).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers).setExclusive(exclusive).setGroupRebalance(groupRebalance).setGroupBuckets(groupBuckets).setGroupFirstKey(groupFirstKey).setLastValue(lastValue).setLastValueKey(lastValueKey).setNonDestructive(nonDestructive).setConsumersBeforeDispatch(consumersBeforeDispatch).setDelayBeforeDispatch(delayBeforeDispatch).setAutoDelete(autoDelete).setAutoDeleteDelay(autoDeleteDelay).setAutoDeleteMessageCount(autoDeleteMessageCount).setAutoCreateAddress(autoCreateAddress).setRingSize(ringSize));
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.createQueueSuccess( name, address, routingType);
AuditLogger.createQueueSuccess(name, address, routingType);
}
return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
} catch (ActiveMQException e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.createQueueFailure( name, address, routingType);
AuditLogger.createQueueFailure(name, address, routingType);
}
throw new IllegalStateException(e.getMessage());
} finally {
@ -1313,7 +1293,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public String updateQueue(String queueConfigurationAsJson) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.updateQueue(this.server, queueConfigurationAsJson);
}
@ -1414,8 +1393,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
String user,
Long ringSize) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.updateQueue(this.server, name, routingType, filter, maxConsumers, purgeOnNoConsumers,
exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize);
AuditLogger.updateQueue(this.server, name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize);
}
checkStarted();
@ -1592,27 +1570,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public void destroyQueue(final String name, final boolean removeConsumers, final boolean forceAutoDeleteAddress) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, forceAutoDeleteAddress);
}
checkStarted();
// destroy might be a long running task, we prevent multiple running tasks in this case
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.destroyQueue(this.server, null, null, name, removeConsumers, forceAutoDeleteAddress);
}
checkStarted();
clearIO();
try {
SimpleString queueName = new SimpleString(name);
clearIO();
try {
server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, forceAutoDeleteAddress);
} catch (Exception e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.destroyQueueFailure(name);
SimpleString queueName = new SimpleString(name);
try {
server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, forceAutoDeleteAddress);
} catch (Exception e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.destroyQueueFailure(name);
}
throw e;
}
throw e;
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.destroyQueueSuccess(name);
}
} finally {
blockOnIO();
}
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.destroyQueueSuccess(name);
}
} finally {
blockOnIO();
}
}
@ -2149,56 +2130,64 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.commitPreparedTransaction(this.server, transactionAsBase64);
}
checkStarted();
clearIO();
try {
List<Xid> xids = resourceManager.getPreparedTransactions();
for (Xid xid : xids) {
if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
Transaction transaction = resourceManager.removeTransaction(xid, null);
transaction.commit(false);
long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
storageManager.waitOnOperations();
resourceManager.putHeuristicCompletion(recordID, xid, true);
return true;
}
// commit might be a long running task if dealing with a large transaction
// ensuring a single one just in case
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.commitPreparedTransaction(this.server, transactionAsBase64);
}
checkStarted();
clearIO();
try {
List<Xid> xids = resourceManager.getPreparedTransactions();
for (Xid xid : xids) {
if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
Transaction transaction = resourceManager.removeTransaction(xid, null);
transaction.commit(false);
long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
storageManager.waitOnOperations();
resourceManager.putHeuristicCompletion(recordID, xid, true);
return true;
}
}
return false;
} finally {
blockOnIO();
}
return false;
} finally {
blockOnIO();
}
}
@Override
public synchronized boolean rollbackPreparedTransaction(final String transactionAsBase64) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.rollbackPreparedTransaction(this.server, transactionAsBase64);
}
checkStarted();
clearIO();
try {
List<Xid> xids = resourceManager.getPreparedTransactions();
for (Xid xid : xids) {
if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
Transaction transaction = resourceManager.removeTransaction(xid, null);
transaction.rollback();
long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
server.getStorageManager().waitOnOperations();
resourceManager.putHeuristicCompletion(recordID, xid, false);
return true;
}
// rollback might be a long running task if dealing with a large transaction
// ensuring a single task just in case
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.rollbackPreparedTransaction(this.server, transactionAsBase64);
}
checkStarted();
clearIO();
try {
List<Xid> xids = resourceManager.getPreparedTransactions();
for (Xid xid : xids) {
if (XidImpl.toBase64String(xid).equals(transactionAsBase64)) {
Transaction transaction = resourceManager.removeTransaction(xid, null);
transaction.rollback();
long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
server.getStorageManager().waitOnOperations();
resourceManager.putHeuristicCompletion(recordID, xid, false);
return true;
}
}
return false;
} finally {
blockOnIO();
}
return false;
} finally {
blockOnIO();
}
}
@ -2278,149 +2267,170 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public boolean closeConsumerConnectionsForAddress(final String address) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.closeConsumerConnectionsForAddress(this.server, address);
}
boolean closed = false;
checkStarted();
// this could be a long running task, ensuring a single task
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.closeConsumerConnectionsForAddress(this.server, address);
}
boolean closed = false;
checkStarted();
clearIO();
try {
for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) {
if (binding instanceof LocalQueueBinding) {
Queue queue = ((LocalQueueBinding) binding).getQueue();
for (Consumer consumer : queue.getConsumers()) {
if (consumer instanceof ServerConsumer) {
ServerConsumer serverConsumer = (ServerConsumer) consumer;
RemotingConnection connection = null;
clearIO();
try {
for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address))) {
if (binding instanceof LocalQueueBinding) {
Queue queue = ((LocalQueueBinding) binding).getQueue();
for (Consumer consumer : queue.getConsumers()) {
if (consumer instanceof ServerConsumer) {
ServerConsumer serverConsumer = (ServerConsumer) consumer;
RemotingConnection connection = null;
for (RemotingConnection potentialConnection : remotingService.getConnections()) {
if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
connection = potentialConnection;
for (RemotingConnection potentialConnection : remotingService.getConnections()) {
if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
connection = potentialConnection;
}
}
}
if (connection != null) {
remotingService.removeConnection(connection.getID());
connection.fail(ActiveMQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address));
closed = true;
if (connection != null) {
remotingService.removeConnection(connection.getID());
connection.fail(ActiveMQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address));
closed = true;
}
}
}
}
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, e);
} finally {
blockOnIO();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, e);
} finally {
blockOnIO();
return closed;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
}
return closed;
}
@Override
public boolean closeConnectionsForUser(final String userName) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.closeConnectionsForUser(this.server, userName);
}
boolean closed = false;
checkStarted();
// possibly a long running task
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.closeConnectionsForUser(this.server, userName);
}
boolean closed = false;
checkStarted();
clearIO();
try {
for (ServerSession serverSession : server.getSessions()) {
if (serverSession.getUsername() != null && serverSession.getUsername().equals(userName)) {
RemotingConnection connection = null;
clearIO();
try {
for (ServerSession serverSession : server.getSessions()) {
if (serverSession.getUsername() != null && serverSession.getUsername().equals(userName)) {
RemotingConnection connection = null;
for (RemotingConnection potentialConnection : remotingService.getConnections()) {
if (potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString())) {
connection = potentialConnection;
for (RemotingConnection potentialConnection : remotingService.getConnections()) {
if (potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString())) {
connection = potentialConnection;
}
}
if (connection != null) {
remotingService.removeConnection(connection.getID());
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName));
closed = true;
}
}
if (connection != null) {
remotingService.removeConnection(connection.getID());
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName));
closed = true;
}
}
} finally {
blockOnIO();
}
} finally {
blockOnIO();
return closed;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
}
return closed;
}
@Override
public boolean closeConnectionWithID(final String ID) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.closeConnectionWithID(this.server, ID);
}
checkStarted();
clearIO();
try {
for (RemotingConnection connection : remotingService.getConnections()) {
if (connection.getID().toString().equals(ID)) {
remotingService.removeConnection(connection.getID());
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionWithIDClosedByManagement(ID));
return true;
}
// possibly a long running task
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.closeConnectionWithID(this.server, ID);
}
} finally {
blockOnIO();
checkStarted();
clearIO();
try {
for (RemotingConnection connection : remotingService.getConnections()) {
if (connection.getID().toString().equals(ID)) {
remotingService.removeConnection(connection.getID());
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionWithIDClosedByManagement(ID));
return true;
}
}
} finally {
blockOnIO();
}
return false;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
}
return false;
}
@Override
public boolean closeSessionWithID(final String connectionID, final String ID) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.closeSessionWithID(this.server, connectionID, ID);
}
checkStarted();
clearIO();
try {
List<ServerSession> sessions = server.getSessions(connectionID);
for (ServerSession session : sessions) {
if (session.getName().equals(ID.toString())) {
session.close(true);
return true;
}
// possibly a long running task
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.closeSessionWithID(this.server, connectionID, ID);
}
checkStarted();
} finally {
blockOnIO();
clearIO();
try {
List<ServerSession> sessions = server.getSessions(connectionID);
for (ServerSession session : sessions) {
if (session.getName().equals(ID.toString())) {
session.close(true);
return true;
}
}
} finally {
blockOnIO();
}
return false;
}
return false;
}
@Override
public boolean closeConsumerWithID(final String sessionID, final String ID) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.closeConsumerWithID(this.server, sessionID, ID);
}
checkStarted();
// possibly a long running task
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.closeConsumerWithID(this.server, sessionID, ID);
}
checkStarted();
clearIO();
try {
Set<ServerSession> sessions = server.getSessions();
for (ServerSession session : sessions) {
if (session.getName().equals(sessionID.toString())) {
Set<ServerConsumer> serverConsumers = session.getServerConsumers();
for (ServerConsumer serverConsumer : serverConsumers) {
if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
serverConsumer.disconnect();
return true;
clearIO();
try {
Set<ServerSession> sessions = server.getSessions();
for (ServerSession session : sessions) {
if (session.getName().equals(sessionID.toString())) {
Set<ServerConsumer> serverConsumers = session.getServerConsumers();
for (ServerConsumer serverConsumer : serverConsumers) {
if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
serverConsumer.disconnect();
return true;
}
}
}
}
}
} finally {
blockOnIO();
} finally {
blockOnIO();
}
return false;
}
return false;
}
@Override
@ -4116,27 +4126,29 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public void scaleDown(String connector) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.scaleDown(this.server, connector);
}
checkStarted();
clearIO();
HAPolicy haPolicy = server.getHAPolicy();
if (haPolicy instanceof LiveOnlyPolicy) {
LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy;
if (liveOnlyPolicy.getScaleDownPolicy() == null) {
liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy());
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.scaleDown(this.server, connector);
}
checkStarted();
liveOnlyPolicy.getScaleDownPolicy().setEnabled(true);
clearIO();
HAPolicy haPolicy = server.getHAPolicy();
if (haPolicy instanceof LiveOnlyPolicy) {
LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy;
if (connector != null) {
liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector);
if (liveOnlyPolicy.getScaleDownPolicy() == null) {
liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy());
}
liveOnlyPolicy.getScaleDownPolicy().setEnabled(true);
if (connector != null) {
liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector);
}
server.fail(true);
}
server.fail(true);
}
}

View File

@ -431,10 +431,15 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
@Override
public long getMessageCount() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getMessageCount(this.addressInfo);
// prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getMessageCount(this.addressInfo);
}
return getMessageCount(DurabilityType.ALL);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
return getMessageCount(DurabilityType.ALL);
}
@Override
@ -472,14 +477,17 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
final String user,
final String password,
boolean createMessageId) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****");
}
try {
return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId);
} catch (Exception e) {
e.printStackTrace();
throw new IllegalStateException(e.getMessage());
// prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****");
}
try {
return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId);
} catch (Exception e) {
e.printStackTrace();
throw new IllegalStateException(e.getMessage());
}
}
}
@ -585,20 +593,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
@Override
public boolean clearDuplicateIdCache() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.clearDuplicateIdCache(this.addressInfo);
}
DuplicateIDCache cache = ((PostOfficeImpl)server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName());
try {
if (cache != null) {
cache.clear();
return true;
// prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.clearDuplicateIdCache(this.addressInfo);
}
DuplicateIDCache cache = ((PostOfficeImpl) server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName());
try {
if (cache != null) {
cache.clear();
return true;
}
} catch (Exception e) {
logger.debug("Failed to clear duplicate ID cache", e);
}
} catch (Exception e) {
logger.debug("Failed to clear duplicate ID cache", e);
}
return false;
return false;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
@ -627,37 +640,41 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
@Override
public long purge() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.purge(this.addressInfo);
}
clearIO();
long totalMsgs = 0;
try {
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
// prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.purge(this.addressInfo);
}
clearIO();
long totalMsgs = 0;
try {
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
}
}
}
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
}
} catch (Throwable t) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
}
throw new IllegalStateException(t.getMessage());
} finally {
blockOnIO();
}
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
}
} catch (Throwable t) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
}
throw new IllegalStateException(t.getMessage());
} finally {
blockOnIO();
}
return totalMsgs;
return totalMsgs;
}
}
@Override
public void replay(String target, String filter) throws Exception {
// server.replay is already calling the managementLock, no need to do it here
server.replay(null, null, this.getAddress(), target, filter);
}
@ -669,22 +686,25 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
Date startScanDate = format.parse(startScan);
Date endScanDate = format.parse(endScan);
// server.replay is already calling the managementLock, no need to do it here
server.replay(startScanDate, endScanDate, this.getAddress(), target, filter);
}
private long getMessageCount(final DurabilityType durability) {
long count = 0;
for (String queueName : getQueueNames()) {
Queue queue = server.locateQueue(queueName);
if (queue != null &&
(durability == DurabilityType.ALL ||
(durability == DurabilityType.DURABLE && queue.isDurable()) ||
(durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) {
count += queue.getMessageCount();
// prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
long count = 0;
for (String queueName : getQueueNames()) {
Queue queue = server.locateQueue(queueName);
if (queue != null && (durability == DurabilityType.ALL || (durability == DurabilityType.DURABLE && queue.isDurable()) || (durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) {
count += queue.getMessageCount();
}
}
return count;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
return count;
}
private void checkStarted() {

View File

@ -740,32 +740,38 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public Map<String, Object>[] listScheduledMessages() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.listScheduledMessages(queue);
}
checkStarted();
// it could be a long running task
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.listScheduledMessages(queue);
}
checkStarted();
clearIO();
try {
List<MessageReference> refs = queue.getScheduledMessages();
return convertMessagesToMaps(refs);
} finally {
blockOnIO();
clearIO();
try {
List<MessageReference> refs = queue.getScheduledMessages();
return convertMessagesToMaps(refs);
} finally {
blockOnIO();
}
}
}
@Override
public String listScheduledMessagesAsJSON() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.listScheduledMessagesAsJSON(queue);
}
checkStarted();
// it could be a long running task
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.listScheduledMessagesAsJSON(queue);
}
checkStarted();
clearIO();
try {
return QueueControlImpl.toJSON(listScheduledMessages());
} finally {
blockOnIO();
clearIO();
try {
return QueueControlImpl.toJSON(listScheduledMessages());
} finally {
blockOnIO();
}
}
}
@ -969,33 +975,36 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
private Map<String, Long> internalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
checkStarted();
// long running task
try (AutoCloseable lock = server.managementLock()) {
checkStarted();
clearIO();
clearIO();
Map<String, Long> result = new HashMap<>();
try {
Filter filter = FilterImpl.createFilter(filterStr);
SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
if (filter == null && groupByProperty == null) {
result.put(null, getMessageCount());
} else {
final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize();
int count = 0;
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext() && count++ < limit) {
Message message = iterator.next().getMessage();
internalComputeMessage(result, filter, groupByProperty, message);
Map<String, Long> result = new HashMap<>();
try {
Filter filter = FilterImpl.createFilter(filterStr);
SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
if (filter == null && groupByProperty == null) {
result.put(null, getMessageCount());
} else {
final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize();
int count = 0;
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext() && count++ < limit) {
Message message = iterator.next().getMessage();
internalComputeMessage(result, filter, groupByProperty, message);
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
}
}
return result;
} finally {
blockOnIO();
}
return result;
} finally {
blockOnIO();
}
}
@ -1019,26 +1028,26 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
private Map<String, Long> internalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
checkStarted();
// it could be a long running task
try (AutoCloseable lock = server.managementLock()) {
checkStarted();
clearIO();
clearIO();
Map<String, Long> result = new HashMap<>();
try {
Filter filter = FilterImpl.createFilter(filterStr);
SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
if (filter == null && groupByProperty == null) {
result.put(null, Long.valueOf(getDeliveringCount()));
} else {
Map<String, List<MessageReference>> deliveringMessages = queue.getDeliveringMessages();
deliveringMessages.forEach((s, messageReferenceList) ->
messageReferenceList.forEach(messageReference ->
internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())
));
Map<String, Long> result = new HashMap<>();
try {
Filter filter = FilterImpl.createFilter(filterStr);
SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
if (filter == null && groupByProperty == null) {
result.put(null, Long.valueOf(getDeliveringCount()));
} else {
Map<String, List<MessageReference>> deliveringMessages = queue.getDeliveringMessages();
deliveringMessages.forEach((s, messageReferenceList) -> messageReferenceList.forEach(messageReference -> internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())));
}
return result;
} finally {
blockOnIO();
}
return result;
} finally {
blockOnIO();
}
}
@ -1057,18 +1066,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public boolean removeMessage(final long messageID) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.removeMessage(queue, messageID);
}
checkStarted();
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.removeMessage(queue, messageID);
}
checkStarted();
clearIO();
try {
return queue.deleteReference(messageID);
} catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage());
} finally {
blockOnIO();
clearIO();
try {
return queue.deleteReference(messageID);
} catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage());
} finally {
blockOnIO();
}
}
}
@ -1079,30 +1091,33 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public int removeMessages(final int flushLimit, final String filterStr) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.removeMessages(queue, flushLimit, filterStr);
}
checkStarted();
clearIO();
try {
Filter filter = FilterImpl.createFilter(filterStr);
int removed = 0;
try {
removed = queue.deleteMatchingReferences(flushLimit, filter);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.removeMessagesSuccess(removed, queue.getName().toString());
}
} catch (Exception e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.removeMessagesFailure(queue.getName().toString());
}
throw e;
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.removeMessages(queue, flushLimit, filterStr);
}
checkStarted();
clearIO();
try {
Filter filter = FilterImpl.createFilter(filterStr);
int removed = 0;
try {
removed = queue.deleteMatchingReferences(flushLimit, filter);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.removeMessagesSuccess(removed, queue.getName().toString());
}
} catch (Exception e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.removeMessagesFailure(queue.getName().toString());
}
throw e;
}
return removed;
} finally {
blockOnIO();
}
return removed;
} finally {
blockOnIO();
}
}
@ -1113,87 +1128,99 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public boolean expireMessage(final long messageID) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.expireMessage(queue, messageID);
}
checkStarted();
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.expireMessage(queue, messageID);
}
checkStarted();
clearIO();
try {
return queue.expireReference(messageID);
} finally {
blockOnIO();
clearIO();
try {
return queue.expireReference(messageID);
} finally {
blockOnIO();
}
}
}
@Override
public int expireMessages(final String filterStr) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.expireMessages(queue, filterStr);
}
checkStarted();
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.expireMessages(queue, filterStr);
}
checkStarted();
clearIO();
try {
Filter filter = FilterImpl.createFilter(filterStr);
return queue.expireReferences(filter);
} catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage());
} finally {
blockOnIO();
clearIO();
try {
Filter filter = FilterImpl.createFilter(filterStr);
return queue.expireReferences(filter);
} catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage());
} finally {
blockOnIO();
}
}
}
@Override
public boolean retryMessage(final long messageID) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.retryMessage(queue, messageID);
}
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.retryMessage(queue, messageID);
}
checkStarted();
clearIO();
checkStarted();
clearIO();
try {
Filter singleMessageFilter = new Filter() {
@Override
public boolean match(Message message) {
return message.getMessageID() == messageID;
}
try {
Filter singleMessageFilter = new Filter() {
@Override
public boolean match(Message message) {
return message.getMessageID() == messageID;
}
@Override
public boolean match(Map<String, String> map) {
return false;
}
@Override
public boolean match(Map<String, String> map) {
return false;
}
@Override
public boolean match(Filterable filterable) {
return false;
}
@Override
public boolean match(Filterable filterable) {
return false;
}
@Override
public SimpleString getFilterString() {
return new SimpleString("custom filter for MESSAGEID= messageID");
}
};
@Override
public SimpleString getFilterString() {
return new SimpleString("custom filter for MESSAGEID= messageID");
}
};
return queue.retryMessages(singleMessageFilter) > 0;
} finally {
blockOnIO();
return queue.retryMessages(singleMessageFilter) > 0;
} finally {
blockOnIO();
}
}
}
@Override
public int retryMessages() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.retryMessages(queue);
}
checkStarted();
clearIO();
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.retryMessages(queue);
}
checkStarted();
clearIO();
try {
return queue.retryMessages(null);
} finally {
blockOnIO();
try {
return queue.retryMessages(null);
} finally {
blockOnIO();
}
}
}
@ -1206,22 +1233,25 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
public boolean moveMessage(final long messageID,
final String otherQueueName,
final boolean rejectDuplicates) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.moveMessage(queue, messageID, otherQueueName, rejectDuplicates);
}
checkStarted();
clearIO();
try {
Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.moveMessage(queue, messageID, otherQueueName, rejectDuplicates);
}
checkStarted();
return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates);
} finally {
blockOnIO();
clearIO();
try {
Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
}
return queue.moveReference(messageID, binding.getAddress(), binding, rejectDuplicates);
} finally {
blockOnIO();
}
}
}
@ -1245,25 +1275,28 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
final String otherQueueName,
final boolean rejectDuplicates,
final int messageCount) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount);
}
checkStarted();
clearIO();
try {
Filter filter = FilterImpl.createFilter(filterStr);
Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.moveMessages(queue, flushLimit, filterStr, otherQueueName, rejectDuplicates, messageCount);
}
checkStarted();
int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding);
return retValue;
} finally {
blockOnIO();
clearIO();
try {
Filter filter = FilterImpl.createFilter(filterStr);
Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
}
int retValue = queue.moveReferences(flushLimit, filter, binding.getAddress(), rejectDuplicates, messageCount, binding);
return retValue;
} finally {
blockOnIO();
}
}
}
@ -1277,18 +1310,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.sendMessagesToDeadLetterAddress(queue, filterStr);
}
checkStarted();
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.sendMessagesToDeadLetterAddress(queue, filterStr);
}
checkStarted();
clearIO();
try {
Filter filter = FilterImpl.createFilter(filterStr);
clearIO();
try {
Filter filter = FilterImpl.createFilter(filterStr);
return queue.sendMessagesToDeadLetterAddress(filter);
} finally {
blockOnIO();
return queue.sendMessagesToDeadLetterAddress(filter);
} finally {
blockOnIO();
}
}
}
@ -1310,35 +1346,41 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
final String user,
final String password,
boolean createMessageId) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.sendMessageThroughManagement(queue, headers, type, body, durable, user, "****");
}
try {
String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, createMessageId, queue.getID());
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.sendMessageSuccess(queue.getName().toString(), user);
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.sendMessageThroughManagement(queue, headers, type, body, durable, user, "****");
}
return s;
} catch (Exception e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.sendMessageFailure(queue.getName().toString(), user);
try {
String s = sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, createMessageId, queue.getID());
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.sendMessageSuccess(queue.getName().toString(), user);
}
return s;
} catch (Exception e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.sendMessageFailure(queue.getName().toString(), user);
}
throw new IllegalStateException(e.getMessage());
}
throw new IllegalStateException(e.getMessage());
}
}
@Override
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.sendMessageToDeadLetterAddress(queue, messageID);
}
checkStarted();
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.sendMessageToDeadLetterAddress(queue, messageID);
}
checkStarted();
clearIO();
try {
return queue.sendMessageToDeadLetterAddress(messageID);
} finally {
blockOnIO();
clearIO();
try {
return queue.sendMessageToDeadLetterAddress(messageID);
} finally {
blockOnIO();
}
}
}
@ -1554,52 +1596,55 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public CompositeData[] browse(int page, int pageSize, String filter) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.browse(queue, page, pageSize);
}
checkStarted();
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.browse(queue, page, pageSize);
}
checkStarted();
clearIO();
try {
long index = 0;
long start = (long) (page - 1) * pageSize;
long end = Math.min(page * pageSize, queue.getMessageCount());
clearIO();
try {
long index = 0;
long start = (long) (page - 1) * pageSize;
long end = Math.min(page * pageSize, queue.getMessageCount());
ArrayList<CompositeData> c = new ArrayList<>();
Filter thefilter = FilterImpl.createFilter(filter);
ArrayList<CompositeData> c = new ArrayList<>();
Filter thefilter = FilterImpl.createFilter(filter);
final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext() && index < end) {
MessageReference ref = iterator.next();
if (thefilter == null || thefilter.match(ref.getMessage())) {
if (index >= start) {
c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext() && index < end) {
MessageReference ref = iterator.next();
if (thefilter == null || thefilter.match(ref.getMessage())) {
if (index >= start) {
c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
}
//we only increase the index if we add a message, otherwise we could stop before we get to a filtered message
index++;
}
//we only increase the index if we add a message, otherwise we could stop before we get to a filtered message
index++;
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
}
CompositeData[] rc = new CompositeData[c.size()];
c.toArray(rc);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.browseMessagesSuccess(queue.getName().toString(), c.size());
CompositeData[] rc = new CompositeData[c.size()];
c.toArray(rc);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.browseMessagesSuccess(queue.getName().toString(), c.size());
}
return rc;
}
return rc;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.browseMessagesFailure(queue.getName().toString());
}
throw new IllegalStateException(e.getMessage());
} finally {
blockOnIO();
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.browseMessagesFailure(queue.getName().toString());
}
throw new IllegalStateException(e.getMessage());
} finally {
blockOnIO();
}
}
@ -1609,46 +1654,49 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
@Override
public CompositeData[] browse(String filter) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.browse(queue, filter);
}
checkStarted();
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.browse(queue, filter);
}
checkStarted();
clearIO();
try {
final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
final int limit = addressSettings.getManagementBrowsePageSize();
int currentPageSize = 0;
ArrayList<CompositeData> c = new ArrayList<>();
Filter thefilter = FilterImpl.createFilter(filter);
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext() && currentPageSize++ < limit) {
MessageReference ref = iterator.next();
if (thefilter == null || thefilter.match(ref.getMessage())) {
c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
clearIO();
try {
final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
final int limit = addressSettings.getManagementBrowsePageSize();
int currentPageSize = 0;
ArrayList<CompositeData> c = new ArrayList<>();
Filter thefilter = FilterImpl.createFilter(filter);
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext() && currentPageSize++ < limit) {
MessageReference ref = iterator.next();
if (thefilter == null || thefilter.match(ref.getMessage())) {
c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
}
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
}
CompositeData[] rc = new CompositeData[c.size()];
c.toArray(rc);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.browseMessagesSuccess(queue.getName().toString(), currentPageSize);
CompositeData[] rc = new CompositeData[c.size()];
c.toArray(rc);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.browseMessagesSuccess(queue.getName().toString(), currentPageSize);
}
return rc;
}
return rc;
} catch (ActiveMQException e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.browseMessagesFailure(queue.getName().toString());
}
throw new IllegalStateException(e.getMessage());
} finally {
blockOnIO();
}
} catch (ActiveMQException e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.browseMessagesFailure(queue.getName().toString());
}
throw new IllegalStateException(e.getMessage());
} finally {
blockOnIO();
}
}
@ -1714,32 +1762,35 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public String listGroupsAsJSON() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.listGroupsAsJSON(queue);
}
checkStarted();
// prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.listGroupsAsJSON(queue);
}
checkStarted();
clearIO();
try {
Map<SimpleString, Consumer> groups = queue.getGroups();
clearIO();
try {
Map<SimpleString, Consumer> groups = queue.getGroups();
JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
for (Map.Entry<SimpleString, Consumer> group : groups.entrySet()) {
for (Map.Entry<SimpleString, Consumer> group : groups.entrySet()) {
if (group.getValue() instanceof ServerConsumer) {
ServerConsumer serverConsumer = (ServerConsumer) group.getValue();
if (group.getValue() instanceof ServerConsumer) {
ServerConsumer serverConsumer = (ServerConsumer) group.getValue();
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
jsonArray.add(obj);
}
jsonArray.add(obj);
}
return jsonArray.build().toString();
} finally {
blockOnIO();
}
return jsonArray.build().toString();
} finally {
blockOnIO();
}
}
@ -1969,31 +2020,37 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
@Override
public void deliverScheduledMessages(String filter) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.deliverScheduledMessage(queue, filter);
}
checkStarted();
// prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.deliverScheduledMessage(queue, filter);
}
checkStarted();
clearIO();
try {
queue.deliverScheduledMessages(filter);
} finally {
blockOnIO();
clearIO();
try {
queue.deliverScheduledMessages(filter);
} finally {
blockOnIO();
}
}
}
@Override
public void deliverScheduledMessage(long messageId) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.deliverScheduledMessage(queue, messageId);
}
checkStarted();
// prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.deliverScheduledMessage(queue, messageId);
}
checkStarted();
clearIO();
try {
queue.deliverScheduledMessage(messageId);
} finally {
blockOnIO();
clearIO();
try {
queue.deliverScheduledMessage(messageId);
} finally {
blockOnIO();
}
}
}

View File

@ -50,6 +50,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -277,7 +278,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
/** Certain management operations shouldn't use more than one thread.
* this semaphore is used to guarantee a single thread used. */
private final Semaphore managementSemaphore = new Semaphore(1);
private final ReentrantLock managementLock = new ReentrantLock();
/**
* This is a thread pool for io tasks only.
@ -522,7 +523,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (replayManager == null) {
throw ActiveMQMessageBundle.BUNDLE.noRetention();
}
replayManager.replay(start, end, address, target, filter);
try (AutoCloseable lock = managementLock()) {
replayManager.replay(start, end, address, target, filter);
}
}
/**
@ -4626,10 +4629,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public AutoCloseable managementLock() throws Exception {
if (!managementSemaphore.tryAcquire(1, TimeUnit.MINUTES)) {
if (!managementLock.tryLock(1, TimeUnit.MINUTES)) {
throw ActiveMQMessageBundle.BUNDLE.managementBusy();
} else {
return managementSemaphore::release;
return managementLock::unlock;
}
}
}

View File

@ -69,23 +69,7 @@ public class ReplayManager {
this.retentionFolder = server.getConfiguration().getJournalRetentionLocation();
}
public void replay(Date start, Date end, String sourceAddress, final String targetAddress, String filter) throws Exception {
if (running.compareAndSet(false, true)) {
try {
actualReplay(start, end, sourceAddress, targetAddress, filter);
} catch (Exception e) {
logger.warn(e.getMessage());
throw e;
} finally {
running.set(false);
}
} else {
throw new RuntimeException("Replay manager is currently busy with another operation");
}
}
private void actualReplay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception {
public void replay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception {
logger.debug("Replay::{}", sourceAddress);
if (sourceAddress == null) {