mirror of https://github.com/apache/activemq.git
refactor of the message-level security implementation so that it works with any QoS, not just queues. Also fixed bug in Message Groups where not setting the sequence ID broke the message groups
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@380688 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a640ff5843
commit
fbb26ba7d7
|
@ -19,6 +19,8 @@ package org.apache.activemq.broker.region;
|
|||
import javax.jms.InvalidSelectorException;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
|
@ -77,14 +79,14 @@ abstract public class AbstractSubscription implements Subscription {
|
|||
return rc;
|
||||
}
|
||||
|
||||
public boolean matches(MessageReference node, MessageEvaluationContext context) {
|
||||
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
|
||||
ConsumerId targetConsumerId = node.getTargetConsumerId();
|
||||
if ( targetConsumerId!=null) {
|
||||
if( !targetConsumerId.equals(info.getConsumerId()) )
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
return selector == null || selector.matches(context);
|
||||
return (selector == null || selector.matches(context)) && this.context.isAllowedToConsume(node);
|
||||
} catch (JMSException e) {
|
||||
log.info("Selector failed to evaluate: " + e.getMessage(), e);
|
||||
return false;
|
||||
|
|
|
@ -18,6 +18,8 @@ package org.apache.activemq.broker.region;
|
|||
|
||||
import javax.jms.InvalidSelectorException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
|
@ -63,7 +65,7 @@ public class QueueBrowserSubscription extends PrefetchSubscription {
|
|||
return super.createMessageDispatch(node, message);
|
||||
}
|
||||
}
|
||||
public boolean matches(MessageReference node, MessageEvaluationContext context) {
|
||||
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
|
||||
return !browseDone && super.matches(node, context);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,11 +72,6 @@ public class QueueSubscription extends PrefetchSubscription {
|
|||
if( node.isAcked() )
|
||||
return false;
|
||||
|
||||
// allow user-level security
|
||||
if (!context.isAllowedToConsume(n)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Keep message groups together.
|
||||
String groupId = node.getGroupID();
|
||||
int sequence = node.getGroupSequence();
|
||||
|
@ -85,7 +80,7 @@ public class QueueSubscription extends PrefetchSubscription {
|
|||
MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners();
|
||||
|
||||
// If we can own the first, then no-one else should own the rest.
|
||||
if( sequence==0 ) {
|
||||
if( sequence == 1 ) {
|
||||
if( node.lock(this) ) {
|
||||
messageGroupOwners.put(groupId, info.getConsumerId());
|
||||
return true;
|
||||
|
|
|
@ -51,8 +51,9 @@ public interface Subscription {
|
|||
* @param node
|
||||
* @param context
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean matches(MessageReference node, MessageEvaluationContext context);
|
||||
boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException;
|
||||
|
||||
/**
|
||||
* Is the subscription interested in messages in the destination?
|
||||
|
|
Loading…
Reference in New Issue