diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 845367823f..9b3fc371b4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -19,6 +19,8 @@ package org.apache.activemq.broker.region; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import java.io.IOException; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -77,14 +79,14 @@ abstract public class AbstractSubscription implements Subscription { return rc; } - public boolean matches(MessageReference node, MessageEvaluationContext context) { + public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { ConsumerId targetConsumerId = node.getTargetConsumerId(); if ( targetConsumerId!=null) { if( !targetConsumerId.equals(info.getConsumerId()) ) return false; } try { - return selector == null || selector.matches(context); + return (selector == null || selector.matches(context)) && this.context.isAllowedToConsume(node); } catch (JMSException e) { log.info("Selector failed to evaluate: " + e.getMessage(), e); return false; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java index b6daf7bc72..f8f92ffcb3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java @@ -18,6 +18,8 @@ package org.apache.activemq.broker.region; import javax.jms.InvalidSelectorException; +import java.io.IOException; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ConsumerInfo; @@ -63,7 +65,7 @@ public class QueueBrowserSubscription extends PrefetchSubscription { return super.createMessageDispatch(node, message); } } - public boolean matches(MessageReference node, MessageEvaluationContext context) { + public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { return !browseDone && super.matches(node, context); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 98622b67f1..9831215610 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -72,11 +72,6 @@ public class QueueSubscription extends PrefetchSubscription { if( node.isAcked() ) return false; - // allow user-level security - if (!context.isAllowedToConsume(n)) { - return false; - } - // Keep message groups together. String groupId = node.getGroupID(); int sequence = node.getGroupSequence(); @@ -85,7 +80,7 @@ public class QueueSubscription extends PrefetchSubscription { MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners(); // If we can own the first, then no-one else should own the rest. - if( sequence==0 ) { + if( sequence == 1 ) { if( node.lock(this) ) { messageGroupOwners.put(groupId, info.getConsumerId()); return true; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java index 9bd8ad06ff..baa4f4cc76 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -51,8 +51,9 @@ public interface Subscription { * @param node * @param context * @return + * @throws IOException */ - boolean matches(MessageReference node, MessageEvaluationContext context); + boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException; /** * Is the subscription interested in messages in the destination?