Applying patch AMQ-5074: MQTT paths with empty levels are not handled correctly.

Thanks Dhiraj!
This commit is contained in:
Hiram Chirino 2014-02-24 09:05:23 -05:00
parent 2b3c477752
commit e7e317dc7e
8 changed files with 148 additions and 56 deletions

View File

@ -283,14 +283,17 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements Da
}
List<String> l = new ArrayList<String>();
StringTokenizer iter = new StringTokenizer(physicalName, PATH_SEPERATOR);
while (iter.hasMoreTokens()) {
String name = iter.nextToken().trim();
if (name.length() == 0) {
continue;
StringBuilder level = new StringBuilder();
final char separator = PATH_SEPERATOR.charAt(0);
for (char c : physicalName.toCharArray()) {
if (c == separator) {
l.add(level.toString());
level.delete(0, level.length());
} else {
level.append(c);
}
l.add(name);
}
l.add(level.toString());
destinationPaths = new String[l.size()];
l.toArray(destinationPaths);

View File

@ -100,7 +100,6 @@ public class AnyChildDestinationNode implements DestinationNode {
return answer;
}
public Collection getChildren() {
Collection answer = new ArrayList();
Iterator iter = getChildNodes().iterator();

View File

@ -53,7 +53,6 @@ public abstract class DestinationFilter implements BooleanExpression {
return new CompositeDestinationFilter(destination);
}
String[] paths = DestinationPath.getDestinationPaths(destination);
paths = rationalizePaths(paths);
int idx = paths.length - 1;
if (idx >= 0) {
String lastPath = paths[idx];
@ -73,26 +72,4 @@ public abstract class DestinationFilter implements BooleanExpression {
return new SimpleDestinationFilter(destination);
}
/**
* Look for the case where any CHILD is followed by any decsendant
*/
public static String[] rationalizePaths(String[] paths) {
String[] result = paths;
if (paths != null && paths.length > 1) {
int last = paths.length - 1;
if (paths[last].equals(ANY_DESCENDENT)) {
last -= 1;
if (paths[last].equals(ANY_DESCENDENT) || paths[last].equals(ANY_CHILD)) {
result = new String[paths.length-1];
System.arraycopy(paths,0,result,0,result.length);
result[result.length-1] = ANY_DESCENDENT;
result = rationalizePaths(result);
}
}
}
return result;
}
}

View File

@ -84,7 +84,6 @@ public class DestinationMap {
return;
}
String[] paths = key.getDestinationPaths();
paths = DestinationFilter.rationalizePaths(paths);
getRootNode(key).add(paths, 0, value);
}

View File

@ -92,7 +92,7 @@ public class DestinationMapNode implements DestinationNode {
}
/**
* Returns a mutable List of the values available at this node in the tree
* Removes values available at this node in the tree
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public List removeValues() {
@ -112,7 +112,11 @@ public class DestinationMapNode implements DestinationNode {
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void removeDesendentValues(Set answer) {
answer.addAll(removeValues());
for (Map.Entry<String, DestinationNode> child : childNodes.entrySet()) {
// remove all the values from the child
answer.addAll(child.getValue().removeValues());
answer.addAll(child.getValue().removeDesendentValues());
}
}
/**
@ -162,6 +166,7 @@ public class DestinationMapNode implements DestinationNode {
break;
}
// TODO is this correct, we are appending wildcard values here???
node.appendMatchingWildcards(answer, paths, i);
if (path.equals(ANY_CHILD)) {
// node = node.getAnyChildNode();
@ -179,10 +184,9 @@ public class DestinationMapNode implements DestinationNode {
@SuppressWarnings({ "rawtypes", "unchecked" })
public void appendDescendantValues(Set answer) {
answer.addAll(values);
// lets add all the children too
// add children values, then recursively add their children
for(DestinationNode child : childNodes.values()) {
answer.addAll(child.getValues());
child.appendDescendantValues(answer);
}
}
@ -208,6 +212,9 @@ public class DestinationMapNode implements DestinationNode {
}
wildCardNode = getChild(ANY_DESCENDENT);
if (wildCardNode != null) {
// for a wildcard Node match, add all values of the descendant node
answer.addAll(wildCardNode.getValues());
// and all descendants for paths like ">.>"
answer.addAll(wildCardNode.getDesendentValues());
}
}

View File

@ -36,7 +36,13 @@ public class PrefixDestinationFilter extends DestinationFilter {
* @param prefixes
*/
public PrefixDestinationFilter(String[] prefixes, byte destinationType) {
this.prefixes = prefixes;
// collapse duplicate '>' at the end of the path
int lastIndex = prefixes.length - 1;
while (lastIndex >= 0 && ANY_DESCENDENT.equals(prefixes[lastIndex])) {
lastIndex--;
}
this.prefixes = new String[lastIndex + 2];
System.arraycopy(prefixes, 0, this.prefixes, 0, this.prefixes.length);
this.destinationType = destinationType;
}
@ -59,11 +65,11 @@ public class PrefixDestinationFilter extends DestinationFilter {
//want to look for the case where A matches A.>
boolean match = true;
for (int i = 0; (i < path.length && match); i++){
match &= matches(prefixes[i],path[i]);
match = matches(prefixes[i], path[i]);
}
//paths get compacted - e.g. A.*.> will be compacted to A.> and by definition - the last element on
//the prefix will be >
if (match && prefixes.length == (path.length + 1)){
if (match && prefixes.length == (path.length + 1)) {
return true;
}
}

View File

@ -16,11 +16,11 @@
*/
package org.apache.activemq.transport.mqtt;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
@ -326,6 +326,99 @@ public class MQTTTest extends AbstractMQTTTest {
publisher.disconnect();
}
@Test(timeout=30000)
public void testValidZeroLengthClientId() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("");
mqtt.setCleanSession(true);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.disconnect();
}
@Test(timeout = 60 * 1000)
public void testMQTTPathPatterns() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("");
mqtt.setCleanSession(true);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
final String RETAINED = "RETAINED";
String[] topics = {"TopicA", "/TopicA", "/", "TopicA/", "//"};
for (String topic : topics) {
// test retained message
connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true);
connection.subscribe(new Topic[]{new Topic(topic, QoS.AT_LEAST_ONCE)});
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(RETAINED + topic, new String(msg.getPayload()));
msg.ack();
// test non-retained message
connection.publish(topic, topic.getBytes(), QoS.AT_LEAST_ONCE, false);
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(topic, new String(msg.getPayload()));
msg.ack();
connection.unsubscribe(new String[] {topic});
}
connection.disconnect();
// test wildcard patterns with above topics
String[] wildcards = {"#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+"};
for (String wildcard : wildcards) {
final Pattern pattern = Pattern.compile(wildcard.replaceAll("/?#", "(/?.*)*").replaceAll("\\+", "[^/]*"));
connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)});
// test retained messages
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
do {
assertNotNull("RETAINED null " + wildcard, msg);
assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(),
pattern.matcher(msg.getTopic()).matches());
msg.ack();
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
} while (msg != null);
// connection is borked after timeout in connection.receive()
connection.disconnect();
connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)});
// test non-retained message
for (String topic : topics) {
connection.publish(topic, topic.getBytes(), QoS.AT_LEAST_ONCE, false);
}
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
do {
assertNotNull("Non-retained Null " + wildcard, msg);
assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(),
pattern.matcher(msg.getTopic()).matches());
msg.ack();
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
} while (msg != null);
connection.unsubscribe(new String[] { wildcard });
connection.disconnect();
}
}
@Test(timeout = 60 * 1000)
public void testMQTTRetainQoS() throws Exception {
addMQTTConnector();
@ -345,13 +438,7 @@ public class MQTTTest extends AbstractMQTTTest {
public void onReceive(MQTTFrame frame) {
// validate the QoS
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
publish.decode(frame);
} catch (ProtocolException e) {
fail ("Failed decoding " + e.getMessage());
}
actualQoS[0] = publish.qos().ordinal();
actualQoS[0] = frame.qos().ordinal();
}
}
});
@ -370,6 +457,7 @@ public class MQTTTest extends AbstractMQTTTest {
waitCount++;
}
assertEquals(i, actualQoS[0]);
msg.ack();
connection.unsubscribe(new String[]{topic});
connection.disconnect();
@ -392,13 +480,7 @@ public class MQTTTest extends AbstractMQTTTest {
public void onReceive(MQTTFrame frame) {
// validate the QoS
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
publish.decode(frame);
} catch (ProtocolException e) {
fail("Failed decoding " + e.getMessage());
}
actualQoS[0] = publish.qos().ordinal();
actualQoS[0] = frame.qos().ordinal();
}
}
});
@ -416,6 +498,7 @@ public class MQTTTest extends AbstractMQTTTest {
final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
int waitCount = 0;
while (actualQoS[0] == -1 && waitCount < 10) {
Thread.sleep(1000);

View File

@ -23,11 +23,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import junit.framework.TestCase;
public class DestinationMapTest extends TestCase {
protected DestinationMap map = new DestinationMap();
@ -231,6 +230,25 @@ public class DestinationMapTest extends TestCase {
assertMapValue("TEST.D1", null);
}
public void testMQTTMappedWildcards() throws Exception {
put("TopicA", v1);
put(".TopicA", v2);
put("TopicA.", v3);
put(".", v4);
put("..TopicA", v5);
put("..", v6);
// test wildcard patterns "#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+"
assertMapValue(">", v1, v2, v3, v4, v5, v6);
assertMapValue("*", v1);
assertMapValue("*.>", v1, v2, v3, v4, v5, v6);
assertMapValue(".*", v2, v4);
assertMapValue("*.", v3, v4);
assertMapValue("*.*", v2, v3, v4);
assertMapValue("*.*.", v6);
assertMapValue("*.*.*", v5, v6);
}
public void testStoreAndLookupAllWildcards() throws Exception {
loadSample2();