change reference count boundaries around messages - so they

are around acks

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@638385 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-03-18 14:32:41 +00:00
parent 3c4d09d749
commit dc4f2993cc
4 changed files with 3 additions and 71 deletions

View File

@ -610,15 +610,4 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
public void setMaxAuditDepth(int maxAuditDepth) { public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth; this.maxAuditDepth = maxAuditDepth;
} }
public List<MessageReference> getInFlightMessages(){
List<MessageReference> result = new ArrayList<MessageReference>();
synchronized(pendingLock) {
result.addAll(dispatched);
result.addAll(pending.pageInList(1000));
}
return result;
}
} }

View File

@ -944,6 +944,7 @@ public class Queue extends BaseDestination implements Task {
reference.drop(); reference.drop();
acknowledge(context, sub, ack, reference); acknowledge(context, sub, ack, reference);
destinationStatistics.getMessages().decrement(); destinationStatistics.getMessages().decrement();
reference.decrementReferenceCount();
synchronized(pagedInMessages) { synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId()); pagedInMessages.remove(reference.getMessageId());
} }
@ -1034,6 +1035,7 @@ public class Queue extends BaseDestination implements Task {
if (dispatchSelector.canSelect(s, node)) { if (dispatchSelector.canSelect(s, node)) {
if (!s.isFull()) { if (!s.isFull()) {
s.add(node); s.add(node);
node.incrementReferenceCount();
target = s; target = s;
break; break;
} else { } else {
@ -1055,6 +1057,7 @@ public class Queue extends BaseDestination implements Task {
} }
if (target != null) { if (target != null) {
target.add(node); target.add(node);
node.incrementReferenceCount();
} }
} }
if (target != null && !strictOrderDispatch && consumers.size() > 1 && if (target != null && !strictOrderDispatch && consumers.size() > 1 &&

View File

@ -107,59 +107,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
return info.isExclusive(); return info.isExclusive();
} }
/**
* Override so that the message ref count is > 0 only when the message is
* being dispatched to a client. Keeping it at 0 when it is in the pending
* list allows the message to be swapped out to disk.
*
* @return true if the message was dispatched.
*/
protected boolean dispatch(MessageReference node) throws IOException {
boolean rc = false;
// This brings the message into memory if it was swapped out.
node.incrementReferenceCount();
try {
rc = super.dispatch(node);
} finally {
// If the message was dispatched, it could be getting dispatched
// async, so we
// can only drop the reference count when that completes @see
// onDispatch
if (!rc) {
node.decrementReferenceCount();
}
}
return rc;
}
/**
* OK Message was transmitted, we can now drop the reference count.
*
* @see org.apache.activemq.broker.region.PrefetchSubscription#onDispatch(org.apache.activemq.broker.region.MessageReference,
* org.apache.activemq.command.Message)
*/
protected void onDispatch(MessageReference node, Message message) {
// Now that the message has been sent over the wire to the client,
// we can let it get swapped out.
node.decrementReferenceCount();
super.onDispatch(node, message);
}
/**
* Sending a message to the DQL will require us to increment the ref count
* so we can get at the content.
*/
protected void sendToDLQ(ConnectionContext context, MessageReference node) throws IOException, Exception {
// This brings the message into memory if it was swapped out.
node.incrementReferenceCount();
try {
super.sendToDLQ(context, node);
} finally {
// This let's the message be swapped out of needed.
node.decrementReferenceCount();
}
}
/** /**
*/ */
public void destroy() { public void destroy() {

View File

@ -215,11 +215,4 @@ public interface Subscription extends SubscriptionRecovery {
* @return true if a browser * @return true if a browser
*/ */
boolean isBrowser(); boolean isBrowser();
/**
* Get the list of in flight messages
* @return list
*/
List<MessageReference> getInFlightMessages();
} }