Updating patch to make sure SelectorAwareVirtualTopics are covered
and code cleanup
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-11-23 20:35:32 +00:00
parent bf36c4cb88
commit bc9edf00d1
2 changed files with 19 additions and 18 deletions

View File

@ -50,21 +50,25 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
*/ */
@Override @Override
protected boolean shouldDispatch(final Broker broker, Message message, Destination dest) throws IOException { protected boolean shouldDispatch(final Broker broker, Message message, Destination dest) throws IOException {
boolean matches = false; //first validate that the prefix matches in the super class
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); if (super.shouldDispatch(broker, message, dest)) {
msgContext.setDestination(dest.getActiveMQDestination()); boolean matches = false;
msgContext.setMessageReference(message); MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
List<Subscription> subs = dest.getConsumers(); msgContext.setDestination(dest.getActiveMQDestination());
for (Subscription sub : subs) { msgContext.setMessageReference(message);
if (sub.matches(message, msgContext)) { List<Subscription> subs = dest.getConsumers();
matches = true; for (Subscription sub : subs) {
break; if (sub.matches(message, msgContext)) {
matches = true;
break;
}
} }
if (matches == false) {
matches = tryMatchingCachedSubs(broker, dest, msgContext);
}
return matches;
} }
if (matches == false) { return false;
matches = tryMatchingCachedSubs(broker, dest, msgContext);
}
return matches;
} }
private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) { private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) {

View File

@ -142,11 +142,8 @@ public class VirtualTopicInterceptor extends DestinationFilter {
} }
protected boolean shouldDispatch(Broker broker, Message message, Destination dest) throws IOException { protected boolean shouldDispatch(Broker broker, Message message, Destination dest) throws IOException {
//can't find .* in the prefix, so default back to old logic and return true //if can't find .* in the prefix, default back to old logic and return true
if(prefix.indexOf(".*")>0){ return prefix.contains(".*") ? dest.getName().startsWith(prefix.substring(0, prefix.indexOf(".*"))) : true;
return dest.getName().startsWith(prefix.substring(0,prefix.indexOf(".*")));
}
else return true;
} }
protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) { protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {