diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java index 3bd4f256a1..0cea4e538f 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java @@ -283,14 +283,17 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements Da } List l = new ArrayList(); - StringTokenizer iter = new StringTokenizer(physicalName, PATH_SEPERATOR); - while (iter.hasMoreTokens()) { - String name = iter.nextToken().trim(); - if (name.length() == 0) { - continue; + StringBuilder level = new StringBuilder(); + final char separator = PATH_SEPERATOR.charAt(0); + for (char c : physicalName.toCharArray()) { + if (c == separator) { + l.add(level.toString()); + level.delete(0, level.length()); + } else { + level.append(c); } - l.add(name); } + l.add(level.toString()); destinationPaths = new String[l.size()]; l.toArray(destinationPaths); diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/AnyChildDestinationNode.java b/activemq-client/src/main/java/org/apache/activemq/filter/AnyChildDestinationNode.java index cf35ac8ef9..985df79b83 100644 --- a/activemq-client/src/main/java/org/apache/activemq/filter/AnyChildDestinationNode.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/AnyChildDestinationNode.java @@ -100,7 +100,6 @@ public class AnyChildDestinationNode implements DestinationNode { return answer; } - public Collection getChildren() { Collection answer = new ArrayList(); Iterator iter = getChildNodes().iterator(); diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java index 34f4b8a8a5..04245242fc 100755 --- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationFilter.java @@ -53,7 +53,6 @@ public abstract class DestinationFilter implements BooleanExpression { return new CompositeDestinationFilter(destination); } String[] paths = DestinationPath.getDestinationPaths(destination); - paths = rationalizePaths(paths); int idx = paths.length - 1; if (idx >= 0) { String lastPath = paths[idx]; @@ -73,26 +72,4 @@ public abstract class DestinationFilter implements BooleanExpression { return new SimpleDestinationFilter(destination); } - /** - * Look for the case where any CHILD is followed by any decsendant - */ - public static String[] rationalizePaths(String[] paths) { - String[] result = paths; - if (paths != null && paths.length > 1) { - int last = paths.length - 1; - if (paths[last].equals(ANY_DESCENDENT)) { - last -= 1; - if (paths[last].equals(ANY_DESCENDENT) || paths[last].equals(ANY_CHILD)) { - - result = new String[paths.length-1]; - System.arraycopy(paths,0,result,0,result.length); - result[result.length-1] = ANY_DESCENDENT; - result = rationalizePaths(result); - } - } - } - - return result; - } - } diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java index 2c71578224..48a4cd3bbf 100755 --- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java @@ -84,7 +84,6 @@ public class DestinationMap { return; } String[] paths = key.getDestinationPaths(); - paths = DestinationFilter.rationalizePaths(paths); getRootNode(key).add(paths, 0, value); } diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java index a2360a0d7e..bd82a93937 100755 --- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java @@ -92,7 +92,7 @@ public class DestinationMapNode implements DestinationNode { } /** - * Returns a mutable List of the values available at this node in the tree + * Removes values available at this node in the tree */ @SuppressWarnings({ "rawtypes", "unchecked" }) public List removeValues() { @@ -112,7 +112,11 @@ public class DestinationMapNode implements DestinationNode { @SuppressWarnings({ "rawtypes", "unchecked" }) protected void removeDesendentValues(Set answer) { - answer.addAll(removeValues()); + for (Map.Entry child : childNodes.entrySet()) { + // remove all the values from the child + answer.addAll(child.getValue().removeValues()); + answer.addAll(child.getValue().removeDesendentValues()); + } } /** @@ -162,6 +166,7 @@ public class DestinationMapNode implements DestinationNode { break; } + // TODO is this correct, we are appending wildcard values here??? node.appendMatchingWildcards(answer, paths, i); if (path.equals(ANY_CHILD)) { // node = node.getAnyChildNode(); @@ -179,10 +184,9 @@ public class DestinationMapNode implements DestinationNode { @SuppressWarnings({ "rawtypes", "unchecked" }) public void appendDescendantValues(Set answer) { - answer.addAll(values); - - // lets add all the children too + // add children values, then recursively add their children for(DestinationNode child : childNodes.values()) { + answer.addAll(child.getValues()); child.appendDescendantValues(answer); } } @@ -208,6 +212,9 @@ public class DestinationMapNode implements DestinationNode { } wildCardNode = getChild(ANY_DESCENDENT); if (wildCardNode != null) { + // for a wildcard Node match, add all values of the descendant node + answer.addAll(wildCardNode.getValues()); + // and all descendants for paths like ">.>" answer.addAll(wildCardNode.getDesendentValues()); } } diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java b/activemq-client/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java index 5f9b6bc8f2..d83df467a3 100755 --- a/activemq-client/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java @@ -36,7 +36,13 @@ public class PrefixDestinationFilter extends DestinationFilter { * @param prefixes */ public PrefixDestinationFilter(String[] prefixes, byte destinationType) { - this.prefixes = prefixes; + // collapse duplicate '>' at the end of the path + int lastIndex = prefixes.length - 1; + while (lastIndex >= 0 && ANY_DESCENDENT.equals(prefixes[lastIndex])) { + lastIndex--; + } + this.prefixes = new String[lastIndex + 2]; + System.arraycopy(prefixes, 0, this.prefixes, 0, this.prefixes.length); this.destinationType = destinationType; } @@ -59,11 +65,11 @@ public class PrefixDestinationFilter extends DestinationFilter { //want to look for the case where A matches A.> boolean match = true; for (int i = 0; (i < path.length && match); i++){ - match &= matches(prefixes[i],path[i]); + match = matches(prefixes[i], path[i]); } //paths get compacted - e.g. A.*.> will be compacted to A.> and by definition - the last element on //the prefix will be > - if (match && prefixes.length == (path.length + 1)){ + if (match && prefixes.length == (path.length + 1)) { return true; } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 3e1e6cb857..9ece80eabb 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,11 +16,11 @@ */ package org.apache.activemq.transport.mqtt; -import java.net.ProtocolException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; @@ -326,6 +326,99 @@ public class MQTTTest extends AbstractMQTTTest { publisher.disconnect(); } + @Test(timeout=30000) + public void testValidZeroLengthClientId() throws Exception { + addMQTTConnector(); + brokerService.start(); + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(""); + mqtt.setCleanSession(true); + + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.disconnect(); + } + + @Test(timeout = 60 * 1000) + public void testMQTTPathPatterns() throws Exception { + addMQTTConnector(); + brokerService.start(); + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(""); + mqtt.setCleanSession(true); + + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + final String RETAINED = "RETAINED"; + String[] topics = {"TopicA", "/TopicA", "/", "TopicA/", "//"}; + for (String topic : topics) { + // test retained message + connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true); + + connection.subscribe(new Topic[]{new Topic(topic, QoS.AT_LEAST_ONCE)}); + Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(RETAINED + topic, new String(msg.getPayload())); + msg.ack(); + + // test non-retained message + connection.publish(topic, topic.getBytes(), QoS.AT_LEAST_ONCE, false); + msg = connection.receive(1000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + assertEquals(topic, new String(msg.getPayload())); + msg.ack(); + + connection.unsubscribe(new String[] {topic}); + } + connection.disconnect(); + + // test wildcard patterns with above topics + String[] wildcards = {"#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+"}; + for (String wildcard : wildcards) { + final Pattern pattern = Pattern.compile(wildcard.replaceAll("/?#", "(/?.*)*").replaceAll("\\+", "[^/]*")); + + connection = mqtt.blockingConnection(); + connection.connect(); + connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)}); + + // test retained messages + Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); + do { + assertNotNull("RETAINED null " + wildcard, msg); + assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED)); + assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), + pattern.matcher(msg.getTopic()).matches()); + msg.ack(); + msg = connection.receive(1000, TimeUnit.MILLISECONDS); + } while (msg != null); + + // connection is borked after timeout in connection.receive() + connection.disconnect(); + connection = mqtt.blockingConnection(); + connection.connect(); + connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)}); + + // test non-retained message + for (String topic : topics) { + connection.publish(topic, topic.getBytes(), QoS.AT_LEAST_ONCE, false); + } + msg = connection.receive(1000, TimeUnit.MILLISECONDS); + do { + assertNotNull("Non-retained Null " + wildcard, msg); + assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(), + pattern.matcher(msg.getTopic()).matches()); + msg.ack(); + msg = connection.receive(1000, TimeUnit.MILLISECONDS); + } while (msg != null); + + connection.unsubscribe(new String[] { wildcard }); + connection.disconnect(); + } + } + @Test(timeout = 60 * 1000) public void testMQTTRetainQoS() throws Exception { addMQTTConnector(); @@ -345,13 +438,7 @@ public class MQTTTest extends AbstractMQTTTest { public void onReceive(MQTTFrame frame) { // validate the QoS if (frame.messageType() == PUBLISH.TYPE) { - PUBLISH publish = new PUBLISH(); - try { - publish.decode(frame); - } catch (ProtocolException e) { - fail ("Failed decoding " + e.getMessage()); - } - actualQoS[0] = publish.qos().ordinal(); + actualQoS[0] = frame.qos().ordinal(); } } }); @@ -370,6 +457,7 @@ public class MQTTTest extends AbstractMQTTTest { waitCount++; } assertEquals(i, actualQoS[0]); + msg.ack(); connection.unsubscribe(new String[]{topic}); connection.disconnect(); @@ -392,13 +480,7 @@ public class MQTTTest extends AbstractMQTTTest { public void onReceive(MQTTFrame frame) { // validate the QoS if (frame.messageType() == PUBLISH.TYPE) { - PUBLISH publish = new PUBLISH(); - try { - publish.decode(frame); - } catch (ProtocolException e) { - fail("Failed decoding " + e.getMessage()); - } - actualQoS[0] = publish.qos().ordinal(); + actualQoS[0] = frame.qos().ordinal(); } } }); @@ -416,6 +498,7 @@ public class MQTTTest extends AbstractMQTTTest { final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull(msg); assertEquals(RETAIN, new String(msg.getPayload())); + msg.ack(); int waitCount = 0; while (actualQoS[0] == -1 && waitCount < 10) { Thread.sleep(1000); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java index 5de46b7543..2f0f92c0a4 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java @@ -23,11 +23,10 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import junit.framework.TestCase; - import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import junit.framework.TestCase; public class DestinationMapTest extends TestCase { protected DestinationMap map = new DestinationMap(); @@ -231,6 +230,25 @@ public class DestinationMapTest extends TestCase { assertMapValue("TEST.D1", null); } + public void testMQTTMappedWildcards() throws Exception { + put("TopicA", v1); + put(".TopicA", v2); + put("TopicA.", v3); + put(".", v4); + put("..TopicA", v5); + put("..", v6); + + // test wildcard patterns "#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+" + assertMapValue(">", v1, v2, v3, v4, v5, v6); + assertMapValue("*", v1); + assertMapValue("*.>", v1, v2, v3, v4, v5, v6); + assertMapValue(".*", v2, v4); + assertMapValue("*.", v3, v4); + assertMapValue("*.*", v2, v3, v4); + assertMapValue("*.*.", v6); + assertMapValue("*.*.*", v5, v6); + } + public void testStoreAndLookupAllWildcards() throws Exception { loadSample2();