fix for AMQ-591 so that we can add a per message level authorization policy to allow content based authorization on a per message basis

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@380684 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-02-24 15:09:34 +00:00
parent 00d07b7783
commit a640ff5843
7 changed files with 120 additions and 6 deletions

View File

@ -53,6 +53,7 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConsumerState; import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.ProducerState;
@ -77,7 +78,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
private static final Log serviceLog = LogFactory.getLog(AbstractConnection.class.getName() + ".Service"); private static final Log serviceLog = LogFactory.getLog(AbstractConnection.class.getName() + ".Service");
protected final Broker broker; protected final Broker broker;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
protected final List dispatchQueue = Collections.synchronizedList(new LinkedList()); protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
protected final TaskRunner taskRunner; protected final TaskRunner taskRunner;
protected final TransportConnector connector; protected final TransportConnector connector;
@ -624,4 +625,13 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
return statistics; return statistics;
} }
public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
return messageAuthorizationPolicy;
}
public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
this.messageAuthorizationPolicy = messageAuthorizationPolicy;
}
} }

View File

@ -51,6 +51,7 @@ import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.network.jms.JmsConnector; import org.apache.activemq.network.jms.JmsConnector;
import org.apache.activemq.proxy.ProxyConnector; import org.apache.activemq.proxy.ProxyConnector;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.store.DefaultPersistenceAdapterFactory; import org.apache.activemq.store.DefaultPersistenceAdapterFactory;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
@ -95,6 +96,7 @@ public class BrokerService implements Service {
private UsageManager memoryManager; private UsageManager memoryManager;
private PersistenceAdapter persistenceAdapter; private PersistenceAdapter persistenceAdapter;
private DefaultPersistenceAdapterFactory persistenceFactory; private DefaultPersistenceAdapterFactory persistenceFactory;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private List transportConnectors = new CopyOnWriteArrayList(); private List transportConnectors = new CopyOnWriteArrayList();
private List networkConnectors = new CopyOnWriteArrayList(); private List networkConnectors = new CopyOnWriteArrayList();
private List proxyConnectors = new CopyOnWriteArrayList(); private List proxyConnectors = new CopyOnWriteArrayList();
@ -154,6 +156,10 @@ public class BrokerService implements Service {
connector.setBroker(getBroker()); connector.setBroker(getBroker());
connector.setBrokerName(getBrokerName()); connector.setBrokerName(getBrokerName());
connector.setTaskRunnerFactory(getTaskRunnerFactory()); connector.setTaskRunnerFactory(getTaskRunnerFactory());
MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
if (policy != null) {
connector.setMessageAuthorizationPolicy(policy);
}
if (isUseJmx()) { if (isUseJmx()) {
connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), getBrokerObjectName()); connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), getBrokerObjectName());
@ -689,6 +695,18 @@ public class BrokerService implements Service {
this.plugins = plugins; this.plugins = plugins;
} }
public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
return messageAuthorizationPolicy;
}
/**
* Sets the policy used to decide if the current connection is authorized to consume
* a given message
*/
public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
this.messageAuthorizationPolicy = messageAuthorizationPolicy;
}
/** /**
* Delete all messages from the persistent store * Delete all messages from the persistent store
* @throws IOException * @throws IOException

View File

@ -18,12 +18,16 @@ package org.apache.activemq.broker;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.security.SecurityContext; import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.transaction.Transaction; import org.apache.activemq.transaction.Transaction;
import java.io.IOException;
/** /**
* Used to hold context information needed to process requests sent to a broker. * Used to hold context information needed to process requests sent to a broker.
* *
@ -45,6 +49,7 @@ public class ConnectionContext {
private WireFormatInfo wireFormatInfo; private WireFormatInfo wireFormatInfo;
private Object longTermStoreContext; private Object longTermStoreContext;
private boolean producerFlowControl=true; private boolean producerFlowControl=true;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
@ -112,6 +117,19 @@ public class ConnectionContext {
this.connector = connector; this.connector = connector;
} }
public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
return messageAuthorizationPolicy;
}
/**
* Sets the policy used to decide if the current connection is authorized to consume
* a given message
*/
public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
this.messageAuthorizationPolicy = messageAuthorizationPolicy;
}
/** /**
* @return * @return
*/ */
@ -195,4 +213,11 @@ public class ConnectionContext {
this.producerFlowControl = disableProducerFlowControl; this.producerFlowControl = disableProducerFlowControl;
} }
public boolean isAllowedToConsume(MessageReference n) throws IOException {
if (messageAuthorizationPolicy != null) {
return messageAuthorizationPolicy.isAllowedToConsume(this, n.getMessage());
}
return true;
}
} }

View File

@ -27,6 +27,7 @@ import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagedTransportConnector; import org.apache.activemq.broker.jmx.ManagedTransportConnector;
import org.apache.activemq.broker.region.ConnectorStatistics; import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportAcceptListener;
@ -55,13 +56,15 @@ public class TransportConnector implements Connector {
private URI uri; private URI uri;
private BrokerInfo brokerInfo = new BrokerInfo(); private BrokerInfo brokerInfo = new BrokerInfo();
private TaskRunnerFactory taskRunnerFactory = null; private TaskRunnerFactory taskRunnerFactory = null;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private DiscoveryAgent discoveryAgent;
protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
protected TransportStatusDetector statusDector; protected TransportStatusDetector statusDector;
private DiscoveryAgent discoveryAgent;
private ConnectorStatistics statistics = new ConnectorStatistics(); private ConnectorStatistics statistics = new ConnectorStatistics();
private URI discoveryUri; private URI discoveryUri;
private URI connectUri; private URI connectUri;
/** /**
* @return Returns the connections. * @return Returns the connections.
*/ */
@ -178,6 +181,18 @@ public class TransportConnector implements Connector {
return statistics; return statistics;
} }
public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
return messageAuthorizationPolicy;
}
/**
* Sets the policy used to decide if the current connection is authorized to consume
* a given message
*/
public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
this.messageAuthorizationPolicy = messageAuthorizationPolicy;
}
public void start() throws Exception { public void start() throws Exception {
getServer().start(); getServer().start();
log.info("Accepting connection on: "+getServer().getConnectURI()); log.info("Accepting connection on: "+getServer().getConnectURI());
@ -210,7 +225,9 @@ public class TransportConnector implements Connector {
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected Connection createConnection(Transport transport) throws IOException { protected Connection createConnection(Transport transport) throws IOException {
return new TransportConnection(this, transport, broker, taskRunnerFactory); TransportConnection answer = new TransportConnection(this, transport, broker, taskRunnerFactory);
answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
return answer;
} }
protected TransportServer createTransportServer() throws IOException, URISyntaxException { protected TransportServer createTransportServer() throws IOException, URISyntaxException {

View File

@ -321,8 +321,9 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
* @param node * @param node
* @return false if the message should not be dispatched to the client (another sub may have already dispatched it * @return false if the message should not be dispatched to the client (another sub may have already dispatched it
* for example). * for example).
* @throws IOException
*/ */
abstract protected boolean canDispatch(MessageReference node); abstract protected boolean canDispatch(MessageReference node) throws IOException;
/** /**
* Used during acknowledgment to remove the message. * Used during acknowledgment to remove the message.

View File

@ -67,11 +67,16 @@ public class QueueSubscription extends PrefetchSubscription {
} }
} }
protected boolean canDispatch(MessageReference n) { protected boolean canDispatch(MessageReference n) throws IOException {
IndirectMessageReference node = (IndirectMessageReference) n; IndirectMessageReference node = (IndirectMessageReference) n;
if( node.isAcked() ) if( node.isAcked() )
return false; return false;
// allow user-level security
if (!context.isAllowedToConsume(n)) {
return false;
}
// Keep message groups together. // Keep message groups together.
String groupId = node.getGroupID(); String groupId = node.getGroupID();
int sequence = node.getGroupSequence(); int sequence = node.getGroupSequence();

View File

@ -0,0 +1,38 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.security;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
/**
* A plugin to allow custom message-level security checks to be performed before
* a message is consumed.
*
* @version $Revision$
*/
public interface MessageAuthorizationPolicy {
/**
* Returns true if the given message is able to be dispatched to the connection
* performing any user
*
* @return true if the context is allowed to consume the message
*/
boolean isAllowedToConsume(ConnectionContext context, Message message);
}