mirror of https://github.com/apache/activemq.git
Fix queue reference counting.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@639315 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b3931192fc
commit
c9c11074a8
|
@ -458,11 +458,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
if (node == null) {
|
if (node == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if(isDropped(node)) {
|
|
||||||
pending.remove();
|
|
||||||
}
|
|
||||||
else if (canDispatch(node)) {
|
|
||||||
pending.remove();
|
pending.remove();
|
||||||
|
if( !isDropped(node) && canDispatch(node)) {
|
||||||
|
|
||||||
// Message may have been sitting in the pending
|
// Message may have been sitting in the pending
|
||||||
// list a while waiting for the consumer to ak the message.
|
// list a while waiting for the consumer to ak the message.
|
||||||
if (node != QueueMessageReference.NULL_MESSAGE
|
if (node != QueueMessageReference.NULL_MESSAGE
|
||||||
|
|
|
@ -1002,7 +1002,6 @@ public class Queue extends BaseDestination implements Task {
|
||||||
reference.drop();
|
reference.drop();
|
||||||
acknowledge(context, sub, ack, reference);
|
acknowledge(context, sub, ack, reference);
|
||||||
destinationStatistics.getMessages().decrement();
|
destinationStatistics.getMessages().decrement();
|
||||||
reference.decrementReferenceCount();
|
|
||||||
synchronized(pagedInMessages) {
|
synchronized(pagedInMessages) {
|
||||||
pagedInMessages.remove(reference.getMessageId());
|
pagedInMessages.remove(reference.getMessageId());
|
||||||
}
|
}
|
||||||
|
@ -1056,6 +1055,7 @@ public class Queue extends BaseDestination implements Task {
|
||||||
messages.reset();
|
messages.reset();
|
||||||
while (messages.hasNext() && count < toPageIn) {
|
while (messages.hasNext() && count < toPageIn) {
|
||||||
MessageReference node = messages.next();
|
MessageReference node = messages.next();
|
||||||
|
node.incrementReferenceCount();
|
||||||
messages.remove();
|
messages.remove();
|
||||||
if (!broker.isExpired(node)) {
|
if (!broker.isExpired(node)) {
|
||||||
QueueMessageReference ref = createMessageReference(node.getMessage());
|
QueueMessageReference ref = createMessageReference(node.getMessage());
|
||||||
|
@ -1097,7 +1097,6 @@ public class Queue extends BaseDestination implements Task {
|
||||||
if (dispatchSelector.canSelect(s, node)) {
|
if (dispatchSelector.canSelect(s, node)) {
|
||||||
if (!s.isFull()) {
|
if (!s.isFull()) {
|
||||||
s.add(node);
|
s.add(node);
|
||||||
node.incrementReferenceCount();
|
|
||||||
target = s;
|
target = s;
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue