From 05c31124021d18db8248f0a260d794e3b5d11823 Mon Sep 17 00:00:00 2001 From: Dejan Bosanac Date: Wed, 18 Feb 2015 18:29:05 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5594 - virtual topics and wildcards --- .../region/virtual/MappedQueueFilter.java | 49 ++--- .../broker/region/virtual/VirtualTopic.java | 3 +- .../activemq/filter/DestinationMapNode.java | 9 +- .../activemq/transport/mqtt/PahoMQTTTest.java | 171 +++++++++++++++--- .../activemq/filter/DestinationMapTest.java | 9 + ...erVirtualDestinationsWithWildcardTest.java | 2 + 6 files changed, 197 insertions(+), 46 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java index 38ccf5da73..e8de9102e8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java @@ -48,32 +48,34 @@ public class MappedQueueFilter extends DestinationFilter { // recover messages for first consumer only boolean noSubs = getConsumers().isEmpty(); - super.addSubscription(context, sub); + if (!sub.getActiveMQDestination().isPattern() || sub.getActiveMQDestination().equals(next.getActiveMQDestination())) { + super.addSubscription(context, sub); - if (noSubs && !getConsumers().isEmpty()) { - // new subscription added, recover retroactive messages - final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class); - final Set virtualDests = regionBroker.getDestinations(virtualDestination); + if (noSubs && !getConsumers().isEmpty()) { + // new subscription added, recover retroactive messages + final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class); + final Set virtualDests = regionBroker.getDestinations(virtualDestination); - final ActiveMQDestination newDestination = sub.getActiveMQDestination(); - final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]); + final ActiveMQDestination newDestination = sub.getActiveMQDestination(); + final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]); - for (Destination virtualDest : virtualDests) { - if (virtualDest.getActiveMQDestination().isTopic() && - (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) { + for (Destination virtualDest : virtualDests) { + if (virtualDest.getActiveMQDestination().isTopic() && + (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) { - Topic topic = (Topic) getBaseDestination(virtualDest); - if (topic != null) { - // re-use browse() to get recovered messages - final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination()); + Topic topic = (Topic) getBaseDestination(virtualDest); + if (topic != null) { + // re-use browse() to get recovered messages + final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination()); - // add recovered messages to subscription - for (Message message : messages) { - final Message copy = message.copy(); - copy.setOriginalDestination(message.getDestination()); - copy.setDestination(newDestination); - copy.setRegionDestination(regionDest); - sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy); + // add recovered messages to subscription + for (Message message : messages) { + final Message copy = message.copy(); + copy.setOriginalDestination(message.getDestination()); + copy.setDestination(newDestination); + copy.setRegionDestination(regionDest); + sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy); + } } } } @@ -99,4 +101,9 @@ public class MappedQueueFilter extends DestinationFilter { public synchronized void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { super.deleteSubscription(context, key); } + + @Override + public String toString() { + return "MappedQueueFilter[" + virtualDestination + ", " + next + "]"; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index c6ab07e8ba..769c7848a5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -91,10 +91,11 @@ public class VirtualTopic implements VirtualDestination { @Override public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { - if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty()) { + if (destination.isQueue() && destination.isPattern()) { DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT)); if (filter.matches(destination)) { broker.addDestination(context, destination, false); + } } } diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java index bd82a93937..4f6ad5a1bf 100755 --- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMapNode.java @@ -112,10 +112,15 @@ public class DestinationMapNode implements DestinationNode { @SuppressWarnings({ "rawtypes", "unchecked" }) protected void removeDesendentValues(Set answer) { + ArrayList candidates = new ArrayList<>(); for (Map.Entry child : childNodes.entrySet()) { + candidates.add(child.getValue()); + } + + for (DestinationNode node : candidates) { // remove all the values from the child - answer.addAll(child.getValue().removeValues()); - answer.addAll(child.getValue().removeDesendentValues()); + answer.addAll(node.removeValues()); + answer.addAll(node.removeDesendentValues()); } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java index e5e5fe5161..263cafccd0 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -16,26 +16,22 @@ */ package org.apache.activemq.transport.mqtt; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.util.Wait; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.util.Wait; -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; @@ -43,13 +39,6 @@ public class PahoMQTTTest extends MQTTTestSupport { private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class); - @Override - @Before - public void setUp() throws Exception { - protocolConfig = "transport.activeMQSubscriptionPrefetch=32766"; - super.setUp(); - } - @Test(timeout = 300000) public void testLotsOfClients() throws Exception { @@ -140,6 +129,142 @@ public class PahoMQTTTest extends MQTTTestSupport { client.close(); } + @Test(timeout = 300000) + public void testSubs() throws Exception { + + stopBroker(); + protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"; + startBroker(); + + final DefaultListener listener = new DefaultListener(); + // subscriber connects and creates durable sub + MqttClient client = createClient(false, "receive", listener); + + final String ACCOUNT_PREFIX = "test/"; + + + client.subscribe(ACCOUNT_PREFIX+"1/2/3"); + client.subscribe(ACCOUNT_PREFIX+"a/+/#"); + client.subscribe(ACCOUNT_PREFIX+"#"); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + String expectedResult = "should get everything"; + client.publish(ACCOUNT_PREFIX+"1/2/3/4", expectedResult.getBytes(), 0, false); + + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + + assertTrue(client.getPendingDeliveryTokens().length == 0); + assertEquals(expectedResult, listener.result); + } + + @Test(timeout=300000) + public void testOverlappingTopics() throws Exception { + + stopBroker(); + protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"; + startBroker(); + + final DefaultListener listener = new DefaultListener(); + // subscriber connects and creates durable sub + MqttClient client = createClient(false, "receive", listener); + + final String ACCOUNT_PREFIX = "test/"; + + // ***************************************** + // check a simple # subscribe works + // ***************************************** + client.subscribe(ACCOUNT_PREFIX+"#"); + assertTrue(client.getPendingDeliveryTokens().length == 0); + String expectedResult = "hello mqtt broker on hash"; + client.publish(ACCOUNT_PREFIX+"a/b/c", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertEquals(expectedResult, listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + expectedResult = "hello mqtt broker on a different topic"; + listener.result = null; + client.publish(ACCOUNT_PREFIX+"1/2/3/4/5/6", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertEquals(expectedResult, listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + // ***************************************** + // now subscribe on a topic that overlaps the root # wildcard - we should still get everything + // ***************************************** + client.subscribe(ACCOUNT_PREFIX+"1/2/3"); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + expectedResult = "hello mqtt broker on explicit topic"; + listener.result = null; + client.publish(ACCOUNT_PREFIX+"1/2/3", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertEquals(expectedResult, listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + expectedResult = "hello mqtt broker on some other topic"; + listener.result = null; + client.publish(ACCOUNT_PREFIX+"a/b/c/d/e", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertEquals(expectedResult, listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + // ***************************************** + // now unsub hash - we should only get called back on 1/2/3 + // ***************************************** + client.unsubscribe(ACCOUNT_PREFIX+"#"); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + expectedResult = "this should not come back..."; + listener.result = null; + client.publish(ACCOUNT_PREFIX+"1/2/3/4", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertNull(listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + expectedResult = "this should not come back either..."; + listener.result = null; + client.publish(ACCOUNT_PREFIX+"a/b/c", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }); + assertNull(listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + } + @Test(timeout = 300000) public void testCleanSession() throws Exception { String topic = "test"; @@ -237,6 +362,7 @@ public class PahoMQTTTest extends MQTTTestSupport { static class DefaultListener implements MqttCallback { int received = 0; + String result; @Override public void connectionLost(Throwable cause) { @@ -247,6 +373,7 @@ public class PahoMQTTTest extends MQTTTestSupport { public void messageArrived(String topic, MqttMessage message) throws Exception { LOG.info("Received: " + message); received++; + result = new String(message.getPayload()); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java index 2f0f92c0a4..3cfbc647d9 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java @@ -330,6 +330,15 @@ public class DestinationMapTest extends TestCase { assertMapValue("FOO.>", v2); } + public void testRemoveWildcard() throws Exception { + put("FOO.A", v1); + put("FOO.>", v2); + + map.removeAll(createDestination("FOO.>")); + + assertMapValue("FOO.A", null); + } + protected void loadSample2() { put("TEST.FOO", v1); put("TEST.*", v2); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java index f30cdb2d86..7c078b508a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java @@ -65,8 +65,10 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple sendReceive("local.test.1", true, "Consumer.a.local.test.1", false, 1, 1); sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 1); + sendReceive("local.test.1.2", true, "Consumer.a.local.test.>", false, 1, 1); sendReceive("global.test.1", true, "Consumer.a.global.test.1", false, 1, 1); sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 1); + sendReceive("global.test.1.2", true, "Consumer.a.global.test.>", false, 1, 1); destroyAllBrokers(); }