Recovery dispatch refactoring as the part of the solution for the https://issues.apache.org/activemq/browse/AMQ-2016

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@736720 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-01-22 17:22:12 +00:00
parent ab438f68c1
commit 807f64591d
2 changed files with 32 additions and 22 deletions

View File

@ -233,6 +233,8 @@ public class Queue extends BaseDestination implements Task {
} }
} }
// do recovery dispatch only if it is a browser subscription
if(sub instanceof QueueBrowserSubscription ) {
// any newly paged in messages that are not dispatched are added to pagedInPending in iterate() // any newly paged in messages that are not dispatched are added to pagedInPending in iterate()
doPageIn(false); doPageIn(false);
@ -242,7 +244,7 @@ public class Queue extends BaseDestination implements Task {
rd.subscription = sub; rd.subscription = sub;
recoveries.addLast(rd); recoveries.addLast(rd);
} }
if( sub instanceof QueueBrowserSubscription ) {
((QueueBrowserSubscription)sub).incrementQueueRef(); ((QueueBrowserSubscription)sub).incrementQueueRef();
} }
if (!(this.optimizedDispatch || isSlave())) { if (!(this.optimizedDispatch || isSlave())) {
@ -303,9 +305,14 @@ public class Queue extends BaseDestination implements Task {
doDispatch(list); doDispatch(list);
} }
} }
//if it is a last consumer (and not a browser) dispatch all pagedIn messages
if (consumers.isEmpty()) { if (consumers.isEmpty() && !(sub instanceof QueueBrowserSubscription)) {
messages.gc(); List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
for (QueueMessageReference ref : pagedInMessages.values()) {
list.add(ref);
}
pagedInPendingDispatch.clear();
doDispatch(list);
} }
if (!(this.optimizedDispatch || isSlave())) { if (!(this.optimizedDispatch || isSlave())) {
wakeup(); wakeup();
@ -615,6 +622,7 @@ public class Queue extends BaseDestination implements Task {
int count = 0; int count = 0;
List<Message> l = new ArrayList<Message>(); List<Message> l = new ArrayList<Message>();
try { try {
pageInMessages(false);
synchronized (this.pagedInPendingDispatch) { synchronized (this.pagedInPendingDispatch) {
for (Iterator<QueueMessageReference> i = this.pagedInPendingDispatch for (Iterator<QueueMessageReference> i = this.pagedInPendingDispatch
.iterator(); i.hasNext() .iterator(); i.hasNext()
@ -657,7 +665,7 @@ public class Queue extends BaseDestination implements Task {
} }
} }
} }
} catch (IOException e) { } catch (Exception e) {
LOG.error("Problem retrieving message in browse() ", e); LOG.error("Problem retrieving message in browse() ", e);
} }
return l.toArray(new Message[l.size()]); return l.toArray(new Message[l.size()]);
@ -899,7 +907,7 @@ public class Queue extends BaseDestination implements Task {
int movedCounter = 0; int movedCounter = 0;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>(); Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
do { do {
pageInMessages(); doPageIn(true);
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
set.addAll(pagedInMessages.values()); set.addAll(pagedInMessages.values());
} }
@ -1230,6 +1238,7 @@ public class Queue extends BaseDestination implements Task {
pagedInPendingDispatch.add(qmr); pagedInPendingDispatch.add(qmr);
} }
} }
wakeup();
} }
} }
} }
@ -1268,11 +1277,16 @@ public class Queue extends BaseDestination implements Task {
} }
} }
interestCount++; interestCount++;
} else {
// makes sure it gets dispatched again
if (!node.isDropped() && !((QueueMessageReference)node).isAcked() && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
interestCount++;
}
} }
} }
if (target == null && interestCount>0) { if ((target == null && interestCount>0) || consumers.size() == 0) {
// This means all subs were full... // This means all subs were full or that there are no consumers...
rc.add((QueueMessageReference)node); rc.add((QueueMessageReference)node);
} }
@ -1288,10 +1302,6 @@ public class Queue extends BaseDestination implements Task {
} }
} }
//LOG.info(getName()+" Pending messages:");
//for (MessageReference n : rc) {
// LOG.info(getName()+" - " + n.getMessageId());
// }
return rc; return rc;
} }

View File

@ -70,7 +70,7 @@ public class QueueDispatchSelector extends SimpleDispatchSelector {
if (result) { if (result) {
result = exclusiveConsumer == null result = exclusiveConsumer == null
|| exclusiveConsumer == subscription; || exclusiveConsumer == subscription;
if (result) { if (result && !subscription.isFull()) {
QueueMessageReference node = (QueueMessageReference) m; QueueMessageReference node = (QueueMessageReference) m;
// Keep message groups together. // Keep message groups together.
String groupId = node.getGroupID(); String groupId = node.getGroupID();