From a640ff58433a435c811f4c33d1cd998a031cc784 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Fri, 24 Feb 2006 15:09:34 +0000 Subject: [PATCH] fix for AMQ-591 so that we can add a per message level authorization policy to allow content based authorization on a per message basis git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@380684 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/AbstractConnection.java | 12 +++++- .../apache/activemq/broker/BrokerService.java | 20 +++++++++- .../activemq/broker/ConnectionContext.java | 25 ++++++++++++ .../activemq/broker/TransportConnector.java | 21 +++++++++- .../broker/region/PrefetchSubscription.java | 3 +- .../broker/region/QueueSubscription.java | 7 +++- .../security/MessageAuthorizationPolicy.java | 38 +++++++++++++++++++ 7 files changed, 120 insertions(+), 6 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/security/MessageAuthorizationPolicy.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java index f930ab8933..fba682ed0d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java @@ -53,6 +53,7 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.ConsumerState; import org.apache.activemq.state.ProducerState; @@ -77,7 +78,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C private static final Log serviceLog = LogFactory.getLog(AbstractConnection.class.getName() + ".Service"); protected final Broker broker; - + private MessageAuthorizationPolicy messageAuthorizationPolicy; protected final List dispatchQueue = Collections.synchronizedList(new LinkedList()); protected final TaskRunner taskRunner; protected final TransportConnector connector; @@ -624,4 +625,13 @@ public abstract class AbstractConnection implements Service, Connection, Task, C return statistics; } + public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { + return messageAuthorizationPolicy; + } + + public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { + this.messageAuthorizationPolicy = messageAuthorizationPolicy; + } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 57813797fe..803f597c6b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -51,6 +51,7 @@ import org.apache.activemq.memory.UsageManager; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.jms.JmsConnector; import org.apache.activemq.proxy.ProxyConnector; +import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.store.DefaultPersistenceAdapterFactory; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; @@ -95,6 +96,7 @@ public class BrokerService implements Service { private UsageManager memoryManager; private PersistenceAdapter persistenceAdapter; private DefaultPersistenceAdapterFactory persistenceFactory; + private MessageAuthorizationPolicy messageAuthorizationPolicy; private List transportConnectors = new CopyOnWriteArrayList(); private List networkConnectors = new CopyOnWriteArrayList(); private List proxyConnectors = new CopyOnWriteArrayList(); @@ -154,7 +156,11 @@ public class BrokerService implements Service { connector.setBroker(getBroker()); connector.setBrokerName(getBrokerName()); connector.setTaskRunnerFactory(getTaskRunnerFactory()); - + MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); + if (policy != null) { + connector.setMessageAuthorizationPolicy(policy); + } + if (isUseJmx()) { connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), getBrokerObjectName()); registerConnectorMBean(connector); @@ -689,6 +695,18 @@ public class BrokerService implements Service { this.plugins = plugins; } + public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { + return messageAuthorizationPolicy; + } + + /** + * Sets the policy used to decide if the current connection is authorized to consume + * a given message + */ + public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { + this.messageAuthorizationPolicy = messageAuthorizationPolicy; + } + /** * Delete all messages from the persistent store * @throws IOException diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java index 14fc678ec5..5df44eec32 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java @@ -18,12 +18,16 @@ package org.apache.activemq.broker; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.transaction.Transaction; +import java.io.IOException; + /** * Used to hold context information needed to process requests sent to a broker. * @@ -45,6 +49,7 @@ public class ConnectionContext { private WireFormatInfo wireFormatInfo; private Object longTermStoreContext; private boolean producerFlowControl=true; + private MessageAuthorizationPolicy messageAuthorizationPolicy; private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); @@ -112,6 +117,19 @@ public class ConnectionContext { this.connector = connector; } + + public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { + return messageAuthorizationPolicy; + } + + /** + * Sets the policy used to decide if the current connection is authorized to consume + * a given message + */ + public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { + this.messageAuthorizationPolicy = messageAuthorizationPolicy; + } + /** * @return */ @@ -195,4 +213,11 @@ public class ConnectionContext { this.producerFlowControl = disableProducerFlowControl; } + public boolean isAllowedToConsume(MessageReference n) throws IOException { + if (messageAuthorizationPolicy != null) { + return messageAuthorizationPolicy.isAllowedToConsume(this, n.getMessage()); + } + return true; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java index 569bb0b03c..398b746b6f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -27,6 +27,7 @@ import javax.management.ObjectName; import org.apache.activemq.broker.jmx.ManagedTransportConnector; import org.apache.activemq.broker.region.ConnectorStatistics; import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportAcceptListener; @@ -55,13 +56,15 @@ public class TransportConnector implements Connector { private URI uri; private BrokerInfo brokerInfo = new BrokerInfo(); private TaskRunnerFactory taskRunnerFactory = null; + private MessageAuthorizationPolicy messageAuthorizationPolicy; + private DiscoveryAgent discoveryAgent; protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); protected TransportStatusDetector statusDector; - private DiscoveryAgent discoveryAgent; private ConnectorStatistics statistics = new ConnectorStatistics(); private URI discoveryUri; private URI connectUri; + /** * @return Returns the connections. */ @@ -177,6 +180,18 @@ public class TransportConnector implements Connector { public ConnectorStatistics getStatistics() { return statistics; } + + public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { + return messageAuthorizationPolicy; + } + + /** + * Sets the policy used to decide if the current connection is authorized to consume + * a given message + */ + public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { + this.messageAuthorizationPolicy = messageAuthorizationPolicy; + } public void start() throws Exception { getServer().start(); @@ -210,7 +225,9 @@ public class TransportConnector implements Connector { // Implementation methods // ------------------------------------------------------------------------- protected Connection createConnection(Transport transport) throws IOException { - return new TransportConnection(this, transport, broker, taskRunnerFactory); + TransportConnection answer = new TransportConnection(this, transport, broker, taskRunnerFactory); + answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); + return answer; } protected TransportServer createTransportServer() throws IOException, URISyntaxException { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 9ff6b96ed9..a6781458f8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -321,8 +321,9 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ * @param node * @return false if the message should not be dispatched to the client (another sub may have already dispatched it * for example). + * @throws IOException */ - abstract protected boolean canDispatch(MessageReference node); + abstract protected boolean canDispatch(MessageReference node) throws IOException; /** * Used during acknowledgment to remove the message. diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 025e4878f1..98622b67f1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -67,11 +67,16 @@ public class QueueSubscription extends PrefetchSubscription { } } - protected boolean canDispatch(MessageReference n) { + protected boolean canDispatch(MessageReference n) throws IOException { IndirectMessageReference node = (IndirectMessageReference) n; if( node.isAcked() ) return false; + // allow user-level security + if (!context.isAllowedToConsume(n)) { + return false; + } + // Keep message groups together. String groupId = node.getGroupID(); int sequence = node.getGroupSequence(); diff --git a/activemq-core/src/main/java/org/apache/activemq/security/MessageAuthorizationPolicy.java b/activemq-core/src/main/java/org/apache/activemq/security/MessageAuthorizationPolicy.java new file mode 100644 index 0000000000..b4531ac9ef --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/security/MessageAuthorizationPolicy.java @@ -0,0 +1,38 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.security; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.Message; + +/** + * A plugin to allow custom message-level security checks to be performed before + * a message is consumed. + * + * @version $Revision$ + */ +public interface MessageAuthorizationPolicy { + + /** + * Returns true if the given message is able to be dispatched to the connection + * performing any user + * + * @return true if the context is allowed to consume the message + */ + boolean isAllowedToConsume(ConnectionContext context, Message message); + +}