AMQ-9202 - Make sure Reentrant locks are acquired outside a try block

This is best practice and will prevent unlock from being attempted
inside of a finally block when the thread doesn't actually own the
lock which can happen when the lock attempt throws an exception
such as calling lockInterruptibly()

(cherry picked from commit ed924cddac)
This commit is contained in:
Christopher L. Shannon (cshannon) 2023-02-01 11:17:31 -05:00
parent bf65929fdc
commit 6d91d71c1a
3 changed files with 42 additions and 44 deletions

View File

@ -1299,8 +1299,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
public void purge() throws Exception { public void purge() throws Exception {
ConnectionContext c = createConnectionContext(); ConnectionContext c = createConnectionContext();
List<MessageReference> list = null; List<MessageReference> list = null;
sendLock.lock();
try { try {
sendLock.lock();
long originalMessageCount = this.destinationStatistics.getMessages().getCount(); long originalMessageCount = this.destinationStatistics.getMessages().getCount();
do { do {
doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed. doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed.

View File

@ -2070,28 +2070,25 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
//flag to know whether the ack forwarding completed without an exception //flag to know whether the ack forwarding completed without an exception
boolean forwarded = false; boolean forwarded = false;
//acquire the checkpoint lock to prevent other threads from
//running a checkpoint while this is running
//
//Normally this task runs on the same executor as the checkpoint task
//so this ack compaction runner wouldn't run at the same time as the checkpoint task.
//
//However, there are two cases where this isn't always true.
//First, the checkpoint() method is public and can be called through the
//PersistenceAdapter interface by someone at the same time this is running.
//Second, a checkpoint is called during shutdown without using the executor.
//
//In the future it might be better to just remove the checkpointLock entirely
//and only use the executor but this would need to be examined for any unintended
//consequences
checkpointLock.readLock().lock();
try { try {
//acquire the checkpoint lock to prevent other threads from // Lock index to capture the ackMessageFileMap data
//running a checkpoint while this is running indexLock.writeLock().lock();
//
//Normally this task runs on the same executor as the checkpoint task
//so this ack compaction runner wouldn't run at the same time as the checkpoint task.
//
//However, there are two cases where this isn't always true.
//First, the checkpoint() method is public and can be called through the
//PersistenceAdapter interface by someone at the same time this is running.
//Second, a checkpoint is called during shutdown without using the executor.
//
//In the future it might be better to just remove the checkpointLock entirely
//and only use the executor but this would need to be examined for any unintended
//consequences
checkpointLock.readLock().lock();
try { try {
// Lock index to capture the ackMessageFileMap data
indexLock.writeLock().lock();
// Map keys might not be sorted, find the earliest log file to forward acks // Map keys might not be sorted, find the earliest log file to forward acks
// from and move only those, future cycles can chip away at more as needed. // from and move only those, future cycles can chip away at more as needed.
// We won't move files that are themselves rewritten on a previous compaction. // We won't move files that are themselves rewritten on a previous compaction.
@ -2210,27 +2207,28 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Lock index while we update the ackMessageFileMap. // Lock index while we update the ackMessageFileMap.
indexLock.writeLock().lock(); indexLock.writeLock().lock();
try {
// Update the ack map with the new locations of the acks // Update the ack map with the new locations of the acks
for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) { for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
if (referenceFileIds == null) { if (referenceFileIds == null) {
referenceFileIds = new HashSet<>(); referenceFileIds = new HashSet<>();
referenceFileIds.addAll(entry.getValue()); referenceFileIds.addAll(entry.getValue());
metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
metadata.ackMessageFileMapDirtyFlag.lazySet(true); metadata.ackMessageFileMapDirtyFlag.lazySet(true);
} else { } else {
referenceFileIds.addAll(entry.getValue()); referenceFileIds.addAll(entry.getValue());
}
} }
// remove the old location data from the ack map so that the old journal log file can
// be removed on next GC.
metadata.ackMessageFileMap.remove(journalToRead);
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
} finally {
indexLock.writeLock().unlock();
} }
// remove the old location data from the ack map so that the old journal log file can
// be removed on next GC.
metadata.ackMessageFileMap.remove(journalToRead);
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
indexLock.writeLock().unlock();
LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
} }

View File

@ -77,8 +77,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
Runnable work = addDestinationWork.poll(); Runnable work = addDestinationWork.poll();
if (work != null) { if (work != null) {
addDestinationBarrier.writeLock().lockInterruptibly();
try { try {
addDestinationBarrier.writeLock().lockInterruptibly();
do { do {
work.run(); work.run();
work = addDestinationWork.poll(); work = addDestinationWork.poll();
@ -88,8 +88,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
addDestinationBarrier.writeLock().unlock(); addDestinationBarrier.writeLock().unlock();
} }
} else { } else {
addDestinationBarrier.readLock().lockInterruptibly();
try { try {
addDestinationBarrier.readLock().lockInterruptibly();
return super.addDestination(context, destination, createIfTemporary); return super.addDestination(context, destination, createIfTemporary);
} finally { } finally {
addDestinationBarrier.readLock().unlock(); addDestinationBarrier.readLock().unlock();
@ -102,8 +102,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
Runnable work = addConnectionWork.poll(); Runnable work = addConnectionWork.poll();
if (work != null) { if (work != null) {
addConnectionBarrier.writeLock().lockInterruptibly();
try { try {
addConnectionBarrier.writeLock().lockInterruptibly();
do { do {
work.run(); work.run();
work = addConnectionWork.poll(); work = addConnectionWork.poll();
@ -113,8 +113,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
addConnectionBarrier.writeLock().unlock(); addConnectionBarrier.writeLock().unlock();
} }
} else { } else {
addConnectionBarrier.readLock().lockInterruptibly();
try { try {
addConnectionBarrier.readLock().lockInterruptibly();
super.addConnection(context, info); super.addConnection(context, info);
} finally { } finally {
addConnectionBarrier.readLock().unlock(); addConnectionBarrier.readLock().unlock();
@ -131,8 +131,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
protected void applyDestinationWork() throws Exception { protected void applyDestinationWork() throws Exception {
Runnable work = addDestinationWork.poll(); Runnable work = addDestinationWork.poll();
if (work != null) { if (work != null) {
addDestinationBarrier.writeLock().lockInterruptibly();
try { try {
addDestinationBarrier.writeLock().lockInterruptibly();
do { do {
work.run(); work.run();
work = addDestinationWork.poll(); work = addDestinationWork.poll();