diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 4bc6690464..79f897ec12 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1299,8 +1299,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index public void purge() throws Exception { ConnectionContext c = createConnectionContext(); List list = null; + sendLock.lock(); try { - sendLock.lock(); long originalMessageCount = this.destinationStatistics.getMessages().getCount(); do { doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed. diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 55137db18b..6687c56b49 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -2070,28 +2070,25 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe //flag to know whether the ack forwarding completed without an exception boolean forwarded = false; + //acquire the checkpoint lock to prevent other threads from + //running a checkpoint while this is running + // + //Normally this task runs on the same executor as the checkpoint task + //so this ack compaction runner wouldn't run at the same time as the checkpoint task. + // + //However, there are two cases where this isn't always true. + //First, the checkpoint() method is public and can be called through the + //PersistenceAdapter interface by someone at the same time this is running. + //Second, a checkpoint is called during shutdown without using the executor. + // + //In the future it might be better to just remove the checkpointLock entirely + //and only use the executor but this would need to be examined for any unintended + //consequences + checkpointLock.readLock().lock(); try { - //acquire the checkpoint lock to prevent other threads from - //running a checkpoint while this is running - // - //Normally this task runs on the same executor as the checkpoint task - //so this ack compaction runner wouldn't run at the same time as the checkpoint task. - // - //However, there are two cases where this isn't always true. - //First, the checkpoint() method is public and can be called through the - //PersistenceAdapter interface by someone at the same time this is running. - //Second, a checkpoint is called during shutdown without using the executor. - // - //In the future it might be better to just remove the checkpointLock entirely - //and only use the executor but this would need to be examined for any unintended - //consequences - checkpointLock.readLock().lock(); - + // Lock index to capture the ackMessageFileMap data + indexLock.writeLock().lock(); try { - - // Lock index to capture the ackMessageFileMap data - indexLock.writeLock().lock(); - // Map keys might not be sorted, find the earliest log file to forward acks // from and move only those, future cycles can chip away at more as needed. // We won't move files that are themselves rewritten on a previous compaction. @@ -2210,27 +2207,28 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Lock index while we update the ackMessageFileMap. indexLock.writeLock().lock(); - - // Update the ack map with the new locations of the acks - for (Entry> entry : updatedAckLocations.entrySet()) { - Set referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); - if (referenceFileIds == null) { - referenceFileIds = new HashSet<>(); - referenceFileIds.addAll(entry.getValue()); - metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); - metadata.ackMessageFileMapDirtyFlag.lazySet(true); - } else { - referenceFileIds.addAll(entry.getValue()); + try { + // Update the ack map with the new locations of the acks + for (Entry> entry : updatedAckLocations.entrySet()) { + Set referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); + if (referenceFileIds == null) { + referenceFileIds = new HashSet<>(); + referenceFileIds.addAll(entry.getValue()); + metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); + metadata.ackMessageFileMapDirtyFlag.lazySet(true); + } else { + referenceFileIds.addAll(entry.getValue()); + } } + + // remove the old location data from the ack map so that the old journal log file can + // be removed on next GC. + metadata.ackMessageFileMap.remove(journalToRead); + metadata.ackMessageFileMapDirtyFlag.lazySet(true); + } finally { + indexLock.writeLock().unlock(); } - // remove the old location data from the ack map so that the old journal log file can - // be removed on next GC. - metadata.ackMessageFileMap.remove(journalToRead); - metadata.ackMessageFileMapDirtyFlag.lazySet(true); - - indexLock.writeLock().unlock(); - LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); } diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java index c672579b33..c5c5ca367d 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java @@ -77,8 +77,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { Runnable work = addDestinationWork.poll(); if (work != null) { + addDestinationBarrier.writeLock().lockInterruptibly(); try { - addDestinationBarrier.writeLock().lockInterruptibly(); do { work.run(); work = addDestinationWork.poll(); @@ -88,8 +88,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter { addDestinationBarrier.writeLock().unlock(); } } else { + addDestinationBarrier.readLock().lockInterruptibly(); try { - addDestinationBarrier.readLock().lockInterruptibly(); return super.addDestination(context, destination, createIfTemporary); } finally { addDestinationBarrier.readLock().unlock(); @@ -102,8 +102,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter { public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { Runnable work = addConnectionWork.poll(); if (work != null) { + addConnectionBarrier.writeLock().lockInterruptibly(); try { - addConnectionBarrier.writeLock().lockInterruptibly(); do { work.run(); work = addConnectionWork.poll(); @@ -113,8 +113,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter { addConnectionBarrier.writeLock().unlock(); } } else { + addConnectionBarrier.readLock().lockInterruptibly(); try { - addConnectionBarrier.readLock().lockInterruptibly(); super.addConnection(context, info); } finally { addConnectionBarrier.readLock().unlock(); @@ -131,8 +131,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter { protected void applyDestinationWork() throws Exception { Runnable work = addDestinationWork.poll(); if (work != null) { + addDestinationBarrier.writeLock().lockInterruptibly(); try { - addDestinationBarrier.writeLock().lockInterruptibly(); do { work.run(); work = addDestinationWork.poll();