https://issues.apache.org/jira/browse/AMQ-4758 - improve performance of pull consumers

This commit is contained in:
Dejan Bosanac 2013-10-01 12:21:50 +02:00
parent e64f83abcc
commit 2a5ad365d7
2 changed files with 19 additions and 12 deletions

View File

@ -2003,13 +2003,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
consumersLock.writeLock().unlock();
}
PendingList rc;
if(isPrioritizedMessages()) {
rc = new PrioritizedPendingList();
} else {
rc = new OrderedPendingList();
}
Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
@ -2027,6 +2020,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
// Dispatch it.
s.add(node);
iterator.remove();
target = s;
break;
}
@ -2043,10 +2037,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
if ((target == null && interestCount > 0) || consumers.size() == 0) {
// This means all subs were full or that there are no
// consumers...
rc.addMessageLast(node);
// return if there are no consumers or all consumers are full
if (target == null && (consumers.size() == 0 || consumers.size() == fullConsumers.size())) {
return list;
}
// If it got dispatched, rotate the consumer list to get round robin
@ -2065,7 +2058,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
return rc;
return list;
}
protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {

View File

@ -250,6 +250,20 @@
<artifactId>regexp</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.17</version>
<optional>true</optional>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.4</version>
<optional>true</optional>
<scope>test</scope>
</dependency>
<!-- Joram JMS conformance tests -->
<dependency>