mirror of https://github.com/apache/activemq.git
fix for AMQ-575 to avoid the deadlock when removing consumers, which can sometimes cause messages to be redispatched when another consumer is ack-ing a message. a good lesson this one - keep locks for as short a time as possible
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@378268 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e82d0fb470
commit
dba97b77c5
|
@ -187,12 +187,14 @@ public class Queue implements Destination {
|
||||||
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
|
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
|
||||||
MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(consumerId);
|
MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(consumerId);
|
||||||
|
|
||||||
synchronized (messages) {
|
if (!sub.getConsumerInfo().isBrowser()) {
|
||||||
if (!sub.getConsumerInfo().isBrowser()) {
|
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
|
||||||
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
|
try {
|
||||||
try {
|
msgContext.setDestination(destination);
|
||||||
msgContext.setDestination(destination);
|
|
||||||
|
|
||||||
|
// lets copy the messages to dispatch to avoid deadlock
|
||||||
|
List messagesToDispatch = new ArrayList();
|
||||||
|
synchronized (messages) {
|
||||||
for (Iterator iter = messages.iterator(); iter.hasNext();) {
|
for (Iterator iter = messages.iterator(); iter.hasNext();) {
|
||||||
IndirectMessageReference node = (IndirectMessageReference) iter.next();
|
IndirectMessageReference node = (IndirectMessageReference) iter.next();
|
||||||
if (node.isDropped()) {
|
if (node.isDropped()) {
|
||||||
|
@ -202,20 +204,25 @@ public class Queue implements Destination {
|
||||||
String groupID = node.getGroupID();
|
String groupID = node.getGroupID();
|
||||||
|
|
||||||
// Re-deliver all messages that the sub locked
|
// Re-deliver all messages that the sub locked
|
||||||
if (node.getLockOwner() == sub || wasExclusiveOwner
|
if (node.getLockOwner() == sub || wasExclusiveOwner || (groupID != null && ownedGroups.contains(groupID))) {
|
||||||
|| (groupID != null && ownedGroups.contains(groupID))) {
|
messagesToDispatch.add(node);
|
||||||
node.incrementRedeliveryCounter();
|
|
||||||
node.unlock();
|
|
||||||
msgContext.setMessageReference(node);
|
|
||||||
dispatchPolicy.dispatch(context, node, msgContext, consumers);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
}
|
||||||
msgContext.clear();
|
|
||||||
|
// now lets dispatch from the copy of the collection to avoid deadlocks
|
||||||
|
for (Iterator iter = messagesToDispatch.iterator(); iter.hasNext();) {
|
||||||
|
IndirectMessageReference node = (IndirectMessageReference) iter.next();
|
||||||
|
node.incrementRedeliveryCounter();
|
||||||
|
node.unlock();
|
||||||
|
msgContext.setMessageReference(node);
|
||||||
|
dispatchPolicy.dispatch(context, node, msgContext, consumers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
finally {
|
||||||
|
msgContext.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
dispatchValve.turnOn();
|
dispatchValve.turnOn();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue