mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2695 - rework getMessageCount to just use the index, build on changes from - https://issues.apache.org/activemq/browse/AMQ-2985
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1038296 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8871c67e8f
commit
21664a9609
|
@ -720,7 +720,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
|
||||
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
|
||||
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
|
||||
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
|
||||
indexLock.writeLock().lock();
|
||||
try {
|
||||
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
|
||||
|
@ -733,30 +732,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
}
|
||||
|
||||
int counter = 0;
|
||||
try {
|
||||
String selector = info.getSelector();
|
||||
BooleanExpression selectorExpression = null;
|
||||
if (selector != null) {
|
||||
selectorExpression = SelectorParser.parse(selector);
|
||||
}
|
||||
sd.orderIndex.resetCursorPosition();
|
||||
sd.orderIndex.setBatch(tx, cursorPos);
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
|
||||
.hasNext();) {
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
if (selectorExpression != null) {
|
||||
MessageEvaluationContext ctx = new MessageEvaluationContext();
|
||||
ctx.setMessageReference(loadMessage(entry.getValue().location));
|
||||
if (selectorExpression.matches(ctx)) {
|
||||
for (Iterator<Entry<Long, HashSet<String>>> iterator =
|
||||
sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) {
|
||||
Entry<Long, HashSet<String>> entry = iterator.next();
|
||||
if (entry.getValue().contains(subscriptionKey)) {
|
||||
counter++;
|
||||
}
|
||||
} else {
|
||||
counter++;
|
||||
}
|
||||
}
|
||||
sd.orderIndex.resetCursorPosition();
|
||||
} catch (Exception e) {
|
||||
throw IOExceptionSupport.create(e);
|
||||
}
|
||||
return counter;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue