git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@957181 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-06-23 13:06:19 +00:00
parent 2aa2278fc2
commit 9f417a2e5a
4 changed files with 83 additions and 13 deletions

View File

@ -21,14 +21,19 @@ import java.io.IOException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -190,7 +195,71 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
public int getMessageCount(String clientId, String subscriberName) throws IOException {
flush();
return topicReferenceStore.getMessageCount(clientId, subscriberName);
SubscriptionInfo info = lookupSubscription(clientId, subscriberName);
try {
MessageCounter counter = new MessageCounter(info, this);
topicReferenceStore.recoverSubscription(clientId, subscriberName, counter);
return counter.count;
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
private class MessageCounter implements MessageRecoveryListener {
int count = 0;
SubscriptionInfo info;
BooleanExpression selectorExpression;
TopicMessageStore store;
public MessageCounter(SubscriptionInfo info, TopicMessageStore store) throws Exception {
this.info = info;
if (info != null) {
String selector = info.getSelector();
if (selector != null) {
this.selectorExpression = SelectorParser.parse(selector);
}
}
this.store = store;
}
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
if (selectorExpression != null) {
MessageEvaluationContext ctx = new MessageEvaluationContext();
ctx.setMessageReference(store.getMessage(ref));
if (selectorExpression.matches(ctx)) {
count++;
}
} else {
count ++;
}
return true;
}
@Override
public boolean recoverMessage(Message message) throws Exception {
if (selectorExpression != null) {
MessageEvaluationContext ctx = new MessageEvaluationContext();
ctx.setMessageReference(store.getMessage(message.getMessageId()));
if (selectorExpression.matches(ctx)) {
count++;
}
} else {
count++;
}
return true;
}
@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
@Override
public boolean hasSpace() {
return true;
}
}
public void resetBatching(String clientId, String subscriptionName) {

View File

@ -328,7 +328,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
if (container != null) {
for (Iterator i = container.iterator(); i.hasNext();) {
ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
ReferenceRecord msg = messageContainer.get(ref.getMessageEntry());
ReferenceRecord msg = messageContainer.getValue(ref.getMessageEntry());
if (msg != null) {
if (!recoverReference(listener, msg)) {
break;

View File

@ -672,26 +672,26 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
cursorPos += 1;
int counter = 0;
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
try {
String selector = info.getSelector();
BooleanExpression selectorExpression = null;
if (selector != null) {
try {
if (selector != null) {
BooleanExpression selectorExpression = SelectorParser.parse(selector);
selectorExpression = SelectorParser.parse(selector);
}
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
if (selectorExpression != null) {
MessageEvaluationContext ctx = new MessageEvaluationContext();
ctx.setMessageReference(loadMessage(entry.getValue().location));
if (selectorExpression.matches(ctx)) {
counter++;
}
}
} catch (Exception e) {
throw IOExceptionSupport.create(e);
} else {
counter++;
}
} else {
counter++;
}
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
return counter;
}

View File

@ -130,6 +130,7 @@ abstract public class DurableSubscriptionSelectorTest extends org.apache.activem
if (deleteMessages) {
broker.setDeleteAllMessagesOnStartup(true);
}
broker.setPersistenceAdapter(createPersistenceAdapter());
broker.start();
}