mirror of https://github.com/apache/activemq.git
Merge pull request #959 from cshannon/AMQ-9202
AMQ-9202 - Make sure Reentrant locks are acquired outside a try block
This commit is contained in:
commit
3f68f49939
|
@ -1299,8 +1299,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
public void purge() throws Exception {
|
public void purge() throws Exception {
|
||||||
ConnectionContext c = createConnectionContext();
|
ConnectionContext c = createConnectionContext();
|
||||||
List<MessageReference> list = null;
|
List<MessageReference> list = null;
|
||||||
try {
|
|
||||||
sendLock.lock();
|
sendLock.lock();
|
||||||
|
try {
|
||||||
long originalMessageCount = this.destinationStatistics.getMessages().getCount();
|
long originalMessageCount = this.destinationStatistics.getMessages().getCount();
|
||||||
do {
|
do {
|
||||||
doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed.
|
doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed.
|
||||||
|
|
|
@ -2070,7 +2070,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
//flag to know whether the ack forwarding completed without an exception
|
//flag to know whether the ack forwarding completed without an exception
|
||||||
boolean forwarded = false;
|
boolean forwarded = false;
|
||||||
|
|
||||||
try {
|
|
||||||
//acquire the checkpoint lock to prevent other threads from
|
//acquire the checkpoint lock to prevent other threads from
|
||||||
//running a checkpoint while this is running
|
//running a checkpoint while this is running
|
||||||
//
|
//
|
||||||
|
@ -2086,12 +2085,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
//and only use the executor but this would need to be examined for any unintended
|
//and only use the executor but this would need to be examined for any unintended
|
||||||
//consequences
|
//consequences
|
||||||
checkpointLock.readLock().lock();
|
checkpointLock.readLock().lock();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// Lock index to capture the ackMessageFileMap data
|
// Lock index to capture the ackMessageFileMap data
|
||||||
indexLock.writeLock().lock();
|
indexLock.writeLock().lock();
|
||||||
|
try {
|
||||||
// Map keys might not be sorted, find the earliest log file to forward acks
|
// Map keys might not be sorted, find the earliest log file to forward acks
|
||||||
// from and move only those, future cycles can chip away at more as needed.
|
// from and move only those, future cycles can chip away at more as needed.
|
||||||
// We won't move files that are themselves rewritten on a previous compaction.
|
// We won't move files that are themselves rewritten on a previous compaction.
|
||||||
|
@ -2210,7 +2207,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
|
|
||||||
// Lock index while we update the ackMessageFileMap.
|
// Lock index while we update the ackMessageFileMap.
|
||||||
indexLock.writeLock().lock();
|
indexLock.writeLock().lock();
|
||||||
|
try {
|
||||||
// Update the ack map with the new locations of the acks
|
// Update the ack map with the new locations of the acks
|
||||||
for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
|
for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
|
||||||
Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
|
Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
|
||||||
|
@ -2228,8 +2225,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
// be removed on next GC.
|
// be removed on next GC.
|
||||||
metadata.ackMessageFileMap.remove(journalToRead);
|
metadata.ackMessageFileMap.remove(journalToRead);
|
||||||
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
|
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
|
||||||
|
} finally {
|
||||||
indexLock.writeLock().unlock();
|
indexLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
|
LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,8 +77,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
|
||||||
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
|
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
|
||||||
Runnable work = addDestinationWork.poll();
|
Runnable work = addDestinationWork.poll();
|
||||||
if (work != null) {
|
if (work != null) {
|
||||||
try {
|
|
||||||
addDestinationBarrier.writeLock().lockInterruptibly();
|
addDestinationBarrier.writeLock().lockInterruptibly();
|
||||||
|
try {
|
||||||
do {
|
do {
|
||||||
work.run();
|
work.run();
|
||||||
work = addDestinationWork.poll();
|
work = addDestinationWork.poll();
|
||||||
|
@ -88,8 +88,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
|
||||||
addDestinationBarrier.writeLock().unlock();
|
addDestinationBarrier.writeLock().unlock();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
try {
|
|
||||||
addDestinationBarrier.readLock().lockInterruptibly();
|
addDestinationBarrier.readLock().lockInterruptibly();
|
||||||
|
try {
|
||||||
return super.addDestination(context, destination, createIfTemporary);
|
return super.addDestination(context, destination, createIfTemporary);
|
||||||
} finally {
|
} finally {
|
||||||
addDestinationBarrier.readLock().unlock();
|
addDestinationBarrier.readLock().unlock();
|
||||||
|
@ -102,8 +102,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
|
||||||
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
|
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
|
||||||
Runnable work = addConnectionWork.poll();
|
Runnable work = addConnectionWork.poll();
|
||||||
if (work != null) {
|
if (work != null) {
|
||||||
try {
|
|
||||||
addConnectionBarrier.writeLock().lockInterruptibly();
|
addConnectionBarrier.writeLock().lockInterruptibly();
|
||||||
|
try {
|
||||||
do {
|
do {
|
||||||
work.run();
|
work.run();
|
||||||
work = addConnectionWork.poll();
|
work = addConnectionWork.poll();
|
||||||
|
@ -113,8 +113,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
|
||||||
addConnectionBarrier.writeLock().unlock();
|
addConnectionBarrier.writeLock().unlock();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
try {
|
|
||||||
addConnectionBarrier.readLock().lockInterruptibly();
|
addConnectionBarrier.readLock().lockInterruptibly();
|
||||||
|
try {
|
||||||
super.addConnection(context, info);
|
super.addConnection(context, info);
|
||||||
} finally {
|
} finally {
|
||||||
addConnectionBarrier.readLock().unlock();
|
addConnectionBarrier.readLock().unlock();
|
||||||
|
@ -131,8 +131,8 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
|
||||||
protected void applyDestinationWork() throws Exception {
|
protected void applyDestinationWork() throws Exception {
|
||||||
Runnable work = addDestinationWork.poll();
|
Runnable work = addDestinationWork.poll();
|
||||||
if (work != null) {
|
if (work != null) {
|
||||||
try {
|
|
||||||
addDestinationBarrier.writeLock().lockInterruptibly();
|
addDestinationBarrier.writeLock().lockInterruptibly();
|
||||||
|
try {
|
||||||
do {
|
do {
|
||||||
work.run();
|
work.run();
|
||||||
work = addDestinationWork.poll();
|
work = addDestinationWork.poll();
|
||||||
|
|
Loading…
Reference in New Issue