diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 17ad1e4cf9..3143cfc491 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -20,11 +20,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertNotEquals; import java.net.ProtocolException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Random; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -44,11 +40,9 @@ import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.filter.DestinationMapEntry; import org.apache.activemq.jaas.GroupPrincipal; -import org.apache.activemq.security.AuthorizationPlugin; -import org.apache.activemq.security.DefaultAuthorizationMap; -import org.apache.activemq.security.SimpleAuthenticationPlugin; -import org.apache.activemq.security.SimpleAuthorizationMap; +import org.apache.activemq.security.*; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; @@ -1294,6 +1288,58 @@ public class MQTTTest extends AbstractMQTTTest { connection.disconnect(); } + @Test(timeout = 60 * 1000) + public void testWildcardRetainedSubscription() throws Exception { + addMQTTConnector(); + + LinkedList users = new LinkedList(); + users.add(new AuthenticationUser("user", "user", "users")); + users.add(new AuthenticationUser("admin", "admin", "admins")); + final SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users); + + + DefaultAuthorizationMap map = new DefaultAuthorizationMap(); + LinkedList authz = new LinkedList(); + + AuthorizationEntry entryOne = new AuthorizationEntry(); + entryOne.setDestination(new ActiveMQTopic("one")); + entryOne.setAdmin("admins"); + entryOne.setRead("admins"); + entryOne.setWrite("admins"); + authz.add(entryOne); + + AuthorizationEntry entryTwo = new AuthorizationEntry(); + entryTwo.setDestination(new ActiveMQTopic("two")); + entryTwo.setAdmin("users"); + entryTwo.setRead("users"); + entryTwo.setWrite("users"); + authz.add(entryTwo); + + map.setAuthorizationEntries(authz); + AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(map); + + brokerService.setPlugins(new BrokerPlugin[] { authorizationPlugin, authenticationPlugin }); + + brokerService.start(); + + MQTT mqttPub = createMQTTConnection("pub", true); + mqttPub.setUserName("admin"); + mqttPub.setPassword("admin"); + + BlockingConnection connectionPub = mqttPub.blockingConnection(); + connectionPub.connect(); + connectionPub.publish("one", "test".getBytes(), QoS.AT_LEAST_ONCE, true); + + MQTT mqttSub = createMQTTConnection("sub", true); + mqttSub.setUserName("user"); + mqttSub.setPassword("user"); + BlockingConnection connectionSub = mqttSub.blockingConnection(); + connectionSub.connect(); + connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)}); + Message msg = connectionSub.receive(1, TimeUnit.SECONDS); + assertNull("Shouldn't receive the message", msg); + } + @Override protected String getProtocolScheme() { return "mqtt";