When messages expire take them out of the paged in list so that we can dispatch more messages to other consumers.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@641525 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-03-26 19:56:29 +00:00
parent 0b6cd58fb7
commit a9ba595748
5 changed files with 34 additions and 9 deletions

View File

@ -111,4 +111,6 @@ public interface Destination extends Service {
* @param value
*/
public void setLazyDispatch(boolean value);
void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node);
}

View File

@ -202,4 +202,8 @@ public class DestinationFilter implements Destination {
public void setLazyDispatch(boolean value) {
next.setLazyDispatch(value);
}
public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
next.messageExpired(context, prefetchSubscription, node);
}
}

View File

@ -246,12 +246,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// the
// acknowledgment.
int index = 0;
for (Iterator<MessageReference> iter = dispatched.iterator(); iter
.hasNext(); index++) {
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
final MessageReference node = iter.next();
if( node.isExpired() ) {
broker.messageExpired(getContext(), node);
node.getRegionDestination().messageExpired(context, this, node);
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
dispatched.remove(node);
}
if (ack.getLastMessageId().equals(node.getMessageId())) {
prefetchExtension = Math.max(prefetchExtension,
index + 1);
prefetchExtension = Math.max(prefetchExtension, index + 1);
callDispatchMatched = true;
break;
}
@ -471,12 +476,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// Message may have been sitting in the pending
// list a while waiting for the consumer to ak the message.
if (node != QueueMessageReference.NULL_MESSAGE
&& node.isExpired()) {
if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
broker.messageExpired(getContext(), node);
dequeueCounter++;
//increment number to dispatch
numberToDispatch++;
node.getRegionDestination().messageExpired(context, this, node);
continue;
}
dispatch(node);

View File

@ -1003,7 +1003,17 @@ public class Queue extends BaseDestination implements Task {
}
wakeup();
}
public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference reference) {
((QueueMessageReference)reference).drop();
// Not sure.. perhaps we should forge an ack to remove the message from the store.
// acknowledge(context, sub, ack, reference);
destinationStatistics.getMessages().decrement();
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
wakeup();
}
protected ConnectionContext createConnectionContext() {
ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
@ -1037,7 +1047,7 @@ public class Queue extends BaseDestination implements Task {
dispatchLock.lock();
try{
int toPageIn = getMaxPageSize() - pagedInMessages.size();
int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
if (isLazyDispatch()&& !force) {
// Only page in the minimum number of messages which can be dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);

View File

@ -631,4 +631,9 @@ public class Topic extends BaseDestination implements Task{
}
}
public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
// TODO Auto-generated method stub
}
}