Fixed AMQ-5160, allowed wildcard subscriptions for future destinations, added tests for wildcard authorization, fixed consumer and producer AdvisoryTopic names for composite destinations by replacing ',' with '‚'

This commit is contained in:
Dhiraj Bokde 2014-05-01 19:13:01 -07:00 committed by Dejan Bosanac
parent a38a7c0093
commit 94b404d0ab
11 changed files with 207 additions and 15 deletions

View File

@ -168,8 +168,13 @@ public abstract class AbstractRegion implements Region {
try {
dest.addSubscription(context, sub);
rc.add(sub);
} catch (Exception e) {
LOG.error("Subscription error for " + sub + ": " + e.getMessage(), e);
} catch (SecurityException e) {
if (sub.isWildcard()) {
LOG.debug("Subscription denied for " + sub + " to destination " +
dest.getActiveMQDestination() + ": " + e.getMessage());
} else {
throw e;
}
}
}
}
@ -318,10 +323,20 @@ public abstract class AbstractRegion implements Region {
try {
dest.addSubscription(context, sub);
removeList.add(dest);
} finally {
// remove subscriptions added earlier
for (Destination remove : removeList) {
remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
} catch (SecurityException e){
if (sub.isWildcard()) {
LOG.debug("Subscription denied for " + sub + " to destination " +
dest.getActiveMQDestination() + ": " + e.getMessage());
} else {
// remove partial subscriptions
for (Destination remove : removeList) {
try {
remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
} catch (Exception ex) {
LOG.error("Error unsubscribing " + sub + " from " + remove + ": " + ex.getMessage(), ex);
}
}
throw e;
}
}
}

View File

@ -21,10 +21,10 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -108,6 +108,11 @@ public abstract class AbstractSubscription implements Subscription {
}
}
@Override
public boolean isWildcard() {
return destinationFilter.isWildcard();
}
@Override
public boolean matches(ActiveMQDestination destination) {
return destinationFilter.matches(destination);

View File

@ -56,6 +56,12 @@ public interface Subscription extends SubscriptionRecovery {
*/
Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception;
/**
* Returns true if this subscription is a Wildcard subscription.
* @return true if wildcard subscription.
*/
boolean isWildcard();
/**
* Is the subscription interested in the message?
* @param node

View File

@ -67,7 +67,7 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
authorizationMap = map;
}
public SecurityContext checkSecurityContext(ConnectionContext context) throws SecurityException {
protected SecurityContext checkSecurityContext(ConnectionContext context) throws SecurityException {
final SecurityContext securityContext = context.getSecurityContext();
if (securityContext == null) {
throw new SecurityException("User is not authenticated.");

View File

@ -105,11 +105,13 @@ public final class AdvisorySupport {
}
public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
String prefix;
if (destination.isQueue()) {
return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
prefix = QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX;
} else {
return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
prefix = TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX;
}
return getAdvisoryTopic(destination, prefix, true);
}
public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException {
@ -117,11 +119,17 @@ public final class AdvisorySupport {
}
public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
String prefix;
if (destination.isQueue()) {
return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX;
} else {
return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX;
}
return getAdvisoryTopic(destination, prefix, false);
}
private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination destination, String prefix, boolean consumerTopics) {
return new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "‚"));
}
public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException {

View File

@ -46,6 +46,12 @@ public class CompositeDestinationFilter extends DestinationFilter {
}
public boolean isWildcard() {
return true;
for (DestinationFilter filter : filters) {
if (filter.isWildcard()) {
return true;
}
}
return false;
}
}

View File

@ -72,4 +72,5 @@ public abstract class DestinationFilter implements BooleanExpression {
return new SimpleDestinationFilter(destination);
}
public abstract boolean isWildcard();
}

View File

@ -17,14 +17,19 @@
package org.apache.activemq;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Test;
public class AuthorizationTest extends RuntimeConfigTestSupport {
private static final int RECEIVE_TIMEOUT = 1000;
String configurationSeed = "authorizationTest";
@Test
@ -44,7 +49,6 @@ public class AuthorizationTest extends RuntimeConfigTestSupport {
assertAllowed("user", "USERS.A");
assertAllowed("guest", "GUESTS.A");
assertDenied("user", "GUESTS.A");
assertDenied("user", ">");
assertAllowedTemp("guest");
}
@ -68,6 +72,85 @@ public class AuthorizationTest extends RuntimeConfigTestSupport {
assertDeniedTemp("guest");
}
@Test
public void testWildcard() throws Exception {
final String brokerConfig = configurationSeed + "-auth-broker";
applyNewConfig(brokerConfig, configurationSeed + "-wildcard-users-guests");
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
final String ALL_USERS = "ALL.USERS";
final String ALL_GUESTS = "ALL.GUESTS";
assertAllowed("user", ALL_USERS);
assertAllowed("guest", ALL_GUESTS);
assertDenied("user", ALL_USERS + "," + ALL_GUESTS);
assertDenied("guest", ALL_GUESTS + "," + ALL_USERS);
final String ALL_PREFIX = "ALL.>";
final String ALL_WILDCARD = "ALL.*";
assertAllowed("user", ALL_PREFIX);
assertAllowed("user", ALL_WILDCARD);
assertAllowed("guest", ALL_PREFIX);
assertAllowed("guest", ALL_WILDCARD);
assertAllowed("user", "ALL.USERS,ALL.>");
assertAllowed("guest", "ALL.GUESTS,ALL.*");
assertDenied("user", "ALL.GUESTS,ALL.>");
assertDenied("guest", "ALL.USERS,ALL.*");
assertDenied("user", "ALL.USERS,ALL.GUESTS.>");
assertDenied("guest", "ALL.GUESTS,ALL.USERS.*");
assertDenied("user", "ALL.USERS.*,ALL.GUESTS.>");
assertDenied("guest", "ALL.GUESTS.>,ALL.USERS.*");
// subscribe to wildcards and check whether messages are actually filtered
final ActiveMQConnection userConn = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection("user", "user");
final ActiveMQConnection guestConn = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection("guest", "guest");
userConn.start();
guestConn.start();
try {
final Session userSession = userConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Session guestSession = guestConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer userProducer = userSession.createProducer(null);
final MessageProducer guestProducer = guestSession.createProducer(null);
// test prefix filter
MessageConsumer userConsumer = userSession.createConsumer(userSession.createQueue(ALL_PREFIX));
MessageConsumer guestConsumer = guestSession.createConsumer(userSession.createQueue(ALL_PREFIX));
userProducer.send(userSession.createQueue(ALL_USERS), userSession.createTextMessage(ALL_USERS));
assertNotNull(userConsumer.receive(RECEIVE_TIMEOUT));
assertNull(guestConsumer.receive(RECEIVE_TIMEOUT));
guestProducer.send(guestSession.createQueue(ALL_GUESTS), guestSession.createTextMessage(ALL_GUESTS));
assertNotNull(guestConsumer.receive(RECEIVE_TIMEOUT));
assertNull(userConsumer.receive(RECEIVE_TIMEOUT));
userConsumer.close();
guestConsumer.close();
// test wildcard filter
userConsumer = userSession.createConsumer(userSession.createQueue(ALL_WILDCARD));
guestConsumer = guestSession.createConsumer(userSession.createQueue(ALL_WILDCARD));
userProducer.send(userSession.createQueue(ALL_USERS), userSession.createTextMessage(ALL_USERS));
assertNotNull(userConsumer.receive(RECEIVE_TIMEOUT));
assertNull(guestConsumer.receive(RECEIVE_TIMEOUT));
guestProducer.send(guestSession.createQueue(ALL_GUESTS), guestSession.createTextMessage(ALL_GUESTS));
assertNotNull(guestConsumer.receive(RECEIVE_TIMEOUT));
assertNull(userConsumer.receive(RECEIVE_TIMEOUT));
} finally {
userConn.close();
guestConn.close();
}
assertAllowedTemp("guest");
}
private void assertDeniedTemp(String userPass) {
try {
assertAllowedTemp(userPass);

View File

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000"/>
<!-- use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
<jaasAuthenticationPlugin configuration="activemq-domain"/>
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="admins" write="admins" admin="admins"/>
<authorizationEntry queue="ALL.USERS.>" read="users" write="users" admin="users"/>
<authorizationEntry queue="ALL.GUESTS.>" read="guests" write="guests,users" admin="guests,users"/>
<authorizationEntry topic=">" read="admins" write="admins" admin="admins"/>
<authorizationEntry topic="ALL.USERS.>" read="users" write="users" admin="users"/>
<authorizationEntry topic="ALL.GUESTS.>" read="guests" write="guests,users" admin="guests,users"/>
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users"
admin="guests,users"/>
</authorizationEntries>
<tempDestinationAuthorizationEntry>
<tempDestinationAuthorizationEntry read="tempDestinationAdmins,guests" write="tempDestinationAdmins,guests"
admin="tempDestinationAdmins,guests"/>
</tempDestinationAuthorizationEntry>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
</broker>
</beans>

View File

@ -281,6 +281,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
return null;
}
@Override
public boolean isWildcard() {
return false;
}
@Override
public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {

View File

@ -42,7 +42,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -61,6 +61,7 @@ import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import junit.framework.TestCase;
public class SubscriptionAddRemoveQueueTest extends TestCase {
@ -322,6 +323,11 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return null;
}
@Override
public boolean isWildcard() {
return false;
}
public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
return new ArrayList<MessageReference>(dispatched);