Initial fix for AMQ-5160 to support subscription authorization for destination filters

This commit is contained in:
Dhiraj Bokde 2014-05-01 11:21:50 -07:00 committed by Dejan Bosanac
parent 89e8767973
commit a38a7c0093
6 changed files with 143 additions and 8 deletions

View File

@ -165,8 +165,12 @@ public abstract class AbstractRegion implements Region {
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
Subscription sub = iter.next(); Subscription sub = iter.next();
if (sub.matches(dest.getActiveMQDestination())) { if (sub.matches(dest.getActiveMQDestination())) {
dest.addSubscription(context, sub); try {
rc.add(sub); dest.addSubscription(context, sub);
rc.add(sub);
} catch (Exception e) {
LOG.error("Subscription error for " + sub + ": " + e.getMessage(), e);
}
} }
} }
return rc; return rc;
@ -290,8 +294,6 @@ public abstract class AbstractRegion implements Region {
Subscription sub = createSubscription(context, info); Subscription sub = createSubscription(context, info);
subscriptions.put(info.getConsumerId(), sub);
// At this point we're done directly manipulating subscriptions, // At this point we're done directly manipulating subscriptions,
// but we need to retain the synchronized block here. Consider // but we need to retain the synchronized block here. Consider
// otherwise what would happen if at this point a second // otherwise what would happen if at this point a second
@ -311,14 +313,26 @@ public abstract class AbstractRegion implements Region {
destinationsLock.readLock().unlock(); destinationsLock.readLock().unlock();
} }
List<Destination> removeList = new ArrayList<Destination>();
for (Destination dest : addList) { for (Destination dest : addList) {
dest.addSubscription(context, sub); try {
dest.addSubscription(context, sub);
removeList.add(dest);
} finally {
// remove subscriptions added earlier
for (Destination remove : removeList) {
remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
}
}
} }
removeList.clear();
if (info.isBrowser()) { if (info.isBrowser()) {
((QueueBrowserSubscription) sub).destinationsAdded(); ((QueueBrowserSubscription) sub).destinationsAdded();
} }
subscriptions.put(info.getConsumerId(), sub);
return sub; return sub;
} }
} }

View File

@ -56,4 +56,9 @@ public class CompositeDestinationInterceptor implements DestinationInterceptor {
public void setInterceptors(final DestinationInterceptor[] interceptors) { public void setInterceptors(final DestinationInterceptor[] interceptors) {
this.interceptors = interceptors; this.interceptors = interceptors;
} }
public DestinationInterceptor[] getInterceptors() {
return interceptors;
}
} }

View File

@ -16,12 +16,17 @@
*/ */
package org.apache.activemq.security; package org.apache.activemq.security;
import java.util.Arrays;
import java.util.Set; import java.util.Set;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
@ -44,13 +49,25 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
public AuthorizationBroker(Broker next, AuthorizationMap authorizationMap) { public AuthorizationBroker(Broker next, AuthorizationMap authorizationMap) {
super(next); super(next);
this.authorizationMap = authorizationMap; this.authorizationMap = authorizationMap;
// add DestinationInterceptor
final RegionBroker regionBroker = (RegionBroker) next.getAdaptor(RegionBroker.class);
final CompositeDestinationInterceptor compositeInterceptor = (CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor();
DestinationInterceptor[] interceptors = compositeInterceptor.getInterceptors();
interceptors = Arrays.copyOf(interceptors, interceptors.length + 1);
interceptors[interceptors.length - 1] = new AuthorizationDestinationInterceptor(this);
compositeInterceptor.setInterceptors(interceptors);
}
public AuthorizationMap getAuthorizationMap() {
return authorizationMap;
} }
public void setAuthorizationMap(AuthorizationMap map) { public void setAuthorizationMap(AuthorizationMap map) {
authorizationMap = map; authorizationMap = map;
} }
protected SecurityContext checkSecurityContext(ConnectionContext context) throws SecurityException { public SecurityContext checkSecurityContext(ConnectionContext context) throws SecurityException {
final SecurityContext securityContext = context.getSecurityContext(); final SecurityContext securityContext = context.getSecurityContext();
if (securityContext == null) { if (securityContext == null) {
throw new SecurityException("User is not authenticated."); throw new SecurityException("User is not authenticated.");

View File

@ -0,0 +1,48 @@
package org.apache.activemq.security;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
/**
* Authorizes addSubscription calls.
*/
public class AuthorizationDestinationFilter extends DestinationFilter {
private final AuthorizationBroker broker;
public AuthorizationDestinationFilter(Destination destination, AuthorizationBroker broker) {
super(destination);
this.broker = broker;
}
@Override
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// authorize subscription
final SecurityContext securityContext = broker.checkSecurityContext(context);
final AuthorizationMap authorizationMap = broker.getAuthorizationMap();
// use the destination being filtered, instead of the destination from the consumerinfo in the subscription
// since that could be a wildcard destination
final ActiveMQDestination destination = next.getActiveMQDestination();
Set<?> allowedACLs;
if (!destination.isTemporary()) {
allowedACLs = authorizationMap.getReadACLs(destination);
} else {
allowedACLs = authorizationMap.getTempDestinationReadACLs();
}
if (!securityContext.isBrokerContext() && allowedACLs != null && !securityContext.isInOneOf(allowedACLs) ) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to read from: " + destination);
}
securityContext.getAuthorizedReadDests().put(destination, destination);
super.addSubscription(context, sub);
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.security;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.command.ActiveMQDestination;
/**
* Adds AuthorizationDestinationFilter on intercept()
*/
public class AuthorizationDestinationInterceptor implements DestinationInterceptor {
private final AuthorizationBroker broker;
public AuthorizationDestinationInterceptor(AuthorizationBroker broker) {
this.broker = broker;
}
@Override
public Destination intercept(Destination destination) {
return new AuthorizationDestinationFilter(destination, broker);
}
@Override
public void remove(Destination destination) {
// do nothing
}
@Override
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
// do nothing
}
}

View File

@ -18,11 +18,10 @@ package org.apache.activemq;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Session; import javax.jms.Session;
import org.junit.Test;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import org.junit.Test;
public class AuthorizationTest extends RuntimeConfigTestSupport { public class AuthorizationTest extends RuntimeConfigTestSupport {
@ -45,6 +44,7 @@ public class AuthorizationTest extends RuntimeConfigTestSupport {
assertAllowed("user", "USERS.A"); assertAllowed("user", "USERS.A");
assertAllowed("guest", "GUESTS.A"); assertAllowed("guest", "GUESTS.A");
assertDenied("user", "GUESTS.A"); assertDenied("user", "GUESTS.A");
assertDenied("user", ">");
assertAllowedTemp("guest"); assertAllowedTemp("guest");
} }