From f55edcfa25de1b55659a7113be60360c531ffa8a Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 29 Jul 2014 17:20:25 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5187 Allow virtual destination to recover retained messages. --- .../broker/region/DestinationFilter.java | 11 ++ .../region/virtual/CompositeDestination.java | 47 ++++- .../broker/region/virtual/CompositeQueue.java | 12 +- .../broker/region/virtual/CompositeTopic.java | 16 +- .../region/virtual/MappedQueueFilter.java | 86 +++++++++ .../region/virtual/VirtualDestination.java | 14 +- .../VirtualDestinationInterceptor.java | 35 +++- .../broker/region/virtual/VirtualTopic.java | 74 ++++++-- .../mqtt/MQTTCompositeQueueRetainedTest.java | 167 ++++++++++++++++++ .../activemq/transport/mqtt/MQTTTest.java | 51 ++++++ 10 files changed, 477 insertions(+), 36 deletions(-) create mode 100644 activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java create mode 100644 activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 001ac2f453..dfc3841337 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -406,4 +406,15 @@ public class DestinationFilter implements Destination { public Destination getNext() { return next; } + + public T getAdaptor(Class clazz) { + if (clazz.isInstance(this)) { + return clazz.cast(this); + } else if (next != null && clazz.isInstance(next)) { + return clazz.cast(next); + } else if (next instanceof DestinationFilter) { + return ((DestinationFilter)next).getAdaptor(clazz); + } + return null; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java index 72c35b6604..56588393ab 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java @@ -22,10 +22,11 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.CommandTypes; /** - * - * + * + * */ public abstract class CompositeDestination implements VirtualDestination { @@ -35,14 +36,17 @@ public abstract class CompositeDestination implements VirtualDestination { private boolean copyMessage = true; private boolean concurrentSend = false; + @Override public Destination intercept(Destination destination) { return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage(), isConcurrentSend()); } - + + @Override public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) { } - public void remove(Destination destination) { + @Override + public void remove(Destination destination) { } public String getName() { @@ -104,4 +108,39 @@ public abstract class CompositeDestination implements VirtualDestination { return this.concurrentSend; } + @Override + public ActiveMQDestination getMappedDestinations() { + + final ActiveMQDestination[] destinations = new ActiveMQDestination[forwardTo.size()]; + int i = 0; + for (Object dest : forwardTo) { + if (dest instanceof FilteredDestination) { + FilteredDestination filteredDestination = (FilteredDestination) dest; + destinations[i++] = filteredDestination.getDestination(); + } else if (dest instanceof ActiveMQDestination) { + destinations[i++] = (ActiveMQDestination) dest; + } else { + // highly unlikely, but just in case! + throw new IllegalArgumentException("Unknown mapped destination type " + dest); + } + } + + // used just for matching destination paths + return new ActiveMQDestination(destinations) { + @Override + protected String getQualifiedPrefix() { + return "mapped://"; + } + + @Override + public byte getDestinationType() { + return QUEUE_TYPE | TOPIC_TYPE; + } + + @Override + public byte getDataStructureType() { + return CommandTypes.ACTIVEMQ_QUEUE | CommandTypes.ACTIVEMQ_TOPIC; + } + }; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java index a425efdd67..1b0f75dae4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java @@ -22,14 +22,20 @@ import org.apache.activemq.command.ActiveMQQueue; /** * Represents a virtual queue which forwards to a number of other destinations. - * + * * @org.apache.xbean.XBean - * - * + * */ public class CompositeQueue extends CompositeDestination { + @Override public ActiveMQDestination getVirtualDestination() { return new ActiveMQQueue(getName()); } + + @Override + public Destination interceptMappedDestination(Destination destination) { + // nothing to do for mapped destinations + return destination; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java index c3087a8d95..667a80cee9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java @@ -16,19 +16,29 @@ */ package org.apache.activemq.broker.region.virtual; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; /** * Represents a virtual topic which forwards to a number of other destinations. - * + * * @org.apache.xbean.XBean - * - * + * */ public class CompositeTopic extends CompositeDestination { + @Override public ActiveMQDestination getVirtualDestination() { return new ActiveMQTopic(getName()); } + + @Override + public Destination interceptMappedDestination(Destination destination) { + if (!isForwardOnly() && destination.getActiveMQDestination().isQueue()) { + // recover retroactive messages in mapped Queue + return new MappedQueueFilter(getVirtualDestination(), destination); + } + return destination; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java new file mode 100644 index 0000000000..c97a257b4f --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java @@ -0,0 +1,86 @@ +package org.apache.activemq.broker.region.virtual; + +import java.util.Set; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.BaseDestination; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFilter; +import org.apache.activemq.broker.region.IndirectMessageReference; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.util.SubscriptionKey; + +/** + * Creates a mapped Queue that can recover messages from subscription recovery + * policy of its Virtual Topic. + */ +public class MappedQueueFilter extends DestinationFilter { + + private final ActiveMQDestination virtualDestination; + + public MappedQueueFilter(ActiveMQDestination virtualDestination, Destination destination) { + super(destination); + this.virtualDestination = virtualDestination; + } + + @Override + public synchronized void addSubscription(ConnectionContext context, Subscription sub) throws Exception { + // recover messages for first consumer only + boolean noSubs = getConsumers().isEmpty(); + + super.addSubscription(context, sub); + + if (noSubs && !getConsumers().isEmpty()) { + // new subscription added, recover retroactive messages + final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class); + final Set virtualDests = regionBroker.getDestinations(virtualDestination); + + final ActiveMQDestination newDestination = sub.getActiveMQDestination(); + final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]); + + for (Destination virtualDest : virtualDests) { + if (virtualDest.getActiveMQDestination().isTopic() && + (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) { + + Topic topic = (Topic) getBaseDestination(virtualDest); + if (topic != null) { + // re-use browse() to get recovered messages + final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination()); + + // add recovered messages to subscription + for (Message message : messages) { + final Message copy = message.copy(); + copy.setOriginalDestination(message.getDestination()); + copy.setDestination(newDestination); + copy.setRegionDestination(regionDest); + sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy); + } + } + } + } + } + } + + private BaseDestination getBaseDestination(Destination virtualDest) { + if (virtualDest instanceof BaseDestination) { + return (BaseDestination) virtualDest; + } else if (virtualDest instanceof DestinationFilter) { + return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class); + } + return null; + } + + @Override + public synchronized void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { + super.removeSubscription(context, sub, lastDeliveredSequenceId); + } + + @Override + public synchronized void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { + super.deleteSubscription(context, key); + } +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java index 57d742c20b..1043c7a3b9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java @@ -22,8 +22,6 @@ import org.apache.activemq.command.ActiveMQDestination; /** * Represents some kind of virtual destination. - * - * */ public interface VirtualDestination extends DestinationInterceptor { @@ -35,5 +33,17 @@ public interface VirtualDestination extends DestinationInterceptor { /** * Creates a virtual destination from the physical destination */ + @Override Destination intercept(Destination destination); + + /** + * Returns mapped destination(s) + */ + ActiveMQDestination getMappedDestinations(); + + /** + * Creates a mapped destination + */ + Destination interceptMappedDestination(Destination destination); + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java index bcb131c283..70be68679c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java @@ -34,21 +34,26 @@ import org.apache.activemq.filter.DestinationMap; /** * Implements Virtual Topics. - * + * href="http://activemq.apache.org/virtual-destinations.html">Virtual + * Topics. + * * @org.apache.xbean.XBean - * + * */ public class VirtualDestinationInterceptor implements DestinationInterceptor { private DestinationMap destinationMap = new DestinationMap(); + private DestinationMap mappedDestinationMap = new DestinationMap(); + private VirtualDestination[] virtualDestinations; + @Override public Destination intercept(Destination destination) { - Set matchingDestinations = destinationMap.get(destination.getActiveMQDestination()); + final ActiveMQDestination activeMQDestination = destination.getActiveMQDestination(); + Set matchingDestinations = destinationMap.get(activeMQDestination); List destinations = new ArrayList(); for (Iterator iter = matchingDestinations.iterator(); iter.hasNext();) { - VirtualDestination virtualDestination = (VirtualDestination)iter.next(); + VirtualDestination virtualDestination = (VirtualDestination) iter.next(); Destination newDestination = virtualDestination.intercept(destination); destinations.add(newDestination); } @@ -60,17 +65,28 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor { return createCompositeDestination(destination, destinations); } } + // check if the destination instead matches any mapped destinations + Set mappedDestinations = mappedDestinationMap.get(activeMQDestination); + assert mappedDestinations.size() < 2; + if (!mappedDestinations.isEmpty()) { + // create a mapped destination interceptor + VirtualDestination virtualDestination = (VirtualDestination) + mappedDestinations.toArray(new VirtualDestination[mappedDestinations.size()])[0]; + return virtualDestination.interceptMappedDestination(destination); + } + return destination; } - + @Override public synchronized void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { - for (VirtualDestination virt: virtualDestinations) { + for (VirtualDestination virt : virtualDestinations) { virt.create(broker, context, destination); } } - public synchronized void remove(Destination destination) { + @Override + public synchronized void remove(Destination destination) { } public VirtualDestination[] getVirtualDestinations() { @@ -79,15 +95,18 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor { public void setVirtualDestinations(VirtualDestination[] virtualDestinations) { destinationMap = new DestinationMap(); + mappedDestinationMap = new DestinationMap(); this.virtualDestinations = virtualDestinations; for (int i = 0; i < virtualDestinations.length; i++) { VirtualDestination virtualDestination = virtualDestinations[i]; destinationMap.put(virtualDestination.getVirtualDestination(), virtualDestination); + mappedDestinationMap.put(virtualDestination.getMappedDestinations(), virtualDestination); } } protected Destination createCompositeDestination(Destination destination, final List destinations) { return new DestinationFilter(destination) { + @Override public void send(ProducerBrokerExchange context, Message messageSend) throws Exception { for (Iterator iter = destinations.iterator(); iter.hasNext();) { Destination destination = iter.next(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index a4d8f135d2..c6ab07e8ba 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.broker.region.virtual; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; @@ -29,10 +32,8 @@ import org.apache.activemq.filter.DestinationFilter; * Topics using a prefix and postfix. The virtual destination creates a * wildcard that is then used to look up all active queue subscriptions which * match. - * + * * @org.apache.xbean.XBean - * - * */ public class VirtualTopic implements VirtualDestination { @@ -42,17 +43,53 @@ public class VirtualTopic implements VirtualDestination { private boolean selectorAware = false; private boolean local = false; - + @Override public ActiveMQDestination getVirtualDestination() { return new ActiveMQTopic(getName()); } + @Override public Destination intercept(Destination destination) { - return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) : - new VirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()); + return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) : new VirtualTopicInterceptor( + destination, getPrefix(), getPostfix(), isLocal()); } - + @Override + public ActiveMQDestination getMappedDestinations() { + return new ActiveMQQueue(prefix + name + postfix); + } + + @Override + public Destination interceptMappedDestination(Destination destination) { + // do a reverse map from destination to get actual virtual destination + final String physicalName = destination.getActiveMQDestination().getPhysicalName(); + final Pattern pattern = Pattern.compile(getRegex(prefix) + "(.*)" + getRegex(postfix)); + final Matcher matcher = pattern.matcher(physicalName); + if (matcher.matches()) { + final String virtualName = matcher.group(1); + return new MappedQueueFilter(new ActiveMQTopic(virtualName), destination); + } + return destination; + } + + private String getRegex(String part) { + StringBuilder builder = new StringBuilder(); + for (char c : part.toCharArray()) { + switch (c) { + case '.': + builder.append("\\."); + break; + case '*': + builder.append("[^\\.]*"); + break; + default: + builder.append(c); + } + } + return builder.toString(); + } + + @Override public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty()) { DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT)); @@ -62,9 +99,10 @@ public class VirtualTopic implements VirtualDestination { } } - public void remove(Destination destination) { + @Override + public void remove(Destination destination) { } - + // Properties // ------------------------------------------------------------------------- @@ -98,17 +136,19 @@ public class VirtualTopic implements VirtualDestination { public void setName(String name) { this.name = name; } - + /** - * Indicates whether the selectors of consumers are used to determine dispatch - * to a virtual destination, when true only messages matching an existing - * consumer will be dispatched. - * @param selectorAware when true take consumer selectors into consideration + * Indicates whether the selectors of consumers are used to determine + * dispatch to a virtual destination, when true only messages matching an + * existing consumer will be dispatched. + * + * @param selectorAware + * when true take consumer selectors into consideration */ public void setSelectorAware(boolean selectorAware) { this.selectorAware = selectorAware; } - + public boolean isSelectorAware() { return selectorAware; } @@ -123,6 +163,8 @@ public class VirtualTopic implements VirtualDestination { @Override public String toString() { - return new StringBuilder("VirtualTopic:").append(prefix).append(',').append(name).append(',').append(postfix).append(',').append(selectorAware).append(',').append(local).toString(); + return new StringBuilder("VirtualTopic:").append(prefix).append(',').append(name).append(','). + append(postfix).append(',').append(selectorAware). + append(',').append(local).toString(); } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java new file mode 100644 index 0000000000..33b5039b5e --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; + +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.virtual.CompositeTopic; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.ByteSequence; +import org.junit.Test; + +/** + * + */ +public class MQTTCompositeQueueRetainedTest extends MQTTTestSupport { + + // configure composite topic + private static final String COMPOSITE_TOPIC = "Composite.TopicA"; + private static final String FORWARD_QUEUE = "Composite.Queue.A"; + private static final String FORWARD_TOPIC = "Composite.Topic.A"; + + private static final int NUM_MESSAGES = 25; + + @Override + protected void createBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(isPersistent()); + brokerService.setAdvisorySupport(false); + brokerService.setSchedulerSupport(isSchedulerSupportEnabled()); + brokerService.setPopulateJMSXUserID(true); + + final CompositeTopic compositeTopic = new CompositeTopic(); + compositeTopic.setName(COMPOSITE_TOPIC); + final ArrayList forwardDestinations = new ArrayList(); + forwardDestinations.add(new ActiveMQQueue(FORWARD_QUEUE)); + forwardDestinations.add(new ActiveMQTopic(FORWARD_TOPIC)); + compositeTopic.setForwardTo(forwardDestinations); + // NOTE: allows retained messages to be set on the Composite + compositeTopic.setForwardOnly(false); + + final VirtualDestinationInterceptor destinationInterceptor = new VirtualDestinationInterceptor(); + destinationInterceptor.setVirtualDestinations(new VirtualDestination[] {compositeTopic} ); + brokerService.setDestinationInterceptors(new DestinationInterceptor[] { destinationInterceptor }); + } + + @Test(timeout = 60 * 1000) + public void testSendMQTTReceiveJMSCompositeDestinations() throws Exception { + + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + + // send retained message + final String MQTT_TOPIC = "Composite/TopicA"; + final String RETAINED = "RETAINED"; + provider.publish(MQTT_TOPIC, RETAINED.getBytes(), AT_LEAST_ONCE, true); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(jmsUri).createConnection(); + // MUST set to true to receive retained messages + activeMQConnection.setUseRetroactiveConsumer(true); + activeMQConnection.setClientID("jms-client"); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Queue jmsQueue = s.createQueue(FORWARD_QUEUE); + javax.jms.Topic jmsTopic = s.createTopic(FORWARD_TOPIC); + + MessageConsumer queueConsumer = s.createConsumer(jmsQueue); + MessageConsumer topicConsumer = s.createDurableSubscriber(jmsTopic, "jms-subscription"); + + // check whether we received retained message twice on mapped Queue, once marked as RETAINED + ActiveMQMessage message; + ByteSequence bs; + for (int i = 0; i < 2; i++) { + message = (ActiveMQMessage) queueConsumer.receive(5000); + assertNotNull("Should get retained message from " + FORWARD_QUEUE, message); + bs = message.getContent(); + assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length)); + assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY) != message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)); + } + + // check whether we received retained message on mapped Topic + message = (ActiveMQMessage) topicConsumer.receive(5000); + assertNotNull("Should get retained message from " + FORWARD_TOPIC, message); + bs = message.getContent(); + assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length)); + assertFalse(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY)); + assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String payload = "Test Message: " + i; + provider.publish(MQTT_TOPIC, payload.getBytes(), AT_LEAST_ONCE); + + message = (ActiveMQMessage) queueConsumer.receive(5000); + assertNotNull("Should get a message from " + FORWARD_QUEUE, message); + bs = message.getContent(); + assertEquals(payload, new String(bs.data, bs.offset, bs.length)); + + message = (ActiveMQMessage) topicConsumer.receive(5000); + assertNotNull("Should get a message from " + FORWARD_TOPIC, message); + bs = message.getContent(); + assertEquals(payload, new String(bs.data, bs.offset, bs.length)); + } + + // close consumer and look for retained messages again + queueConsumer.close(); + topicConsumer.close(); + + queueConsumer = s.createConsumer(jmsQueue); + topicConsumer = s.createDurableSubscriber(jmsTopic, "jms-subscription"); + + // check whether we received retained message on mapped Queue, again + message = (ActiveMQMessage) queueConsumer.receive(5000); + assertNotNull("Should get recovered retained message from " + FORWARD_QUEUE, message); + bs = message.getContent(); + assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length)); + assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)); + assertNull("Should not get second retained message from " + FORWARD_QUEUE, queueConsumer.receive(5000)); + + // check whether we received retained message on mapped Topic, again + message = (ActiveMQMessage) topicConsumer.receive(5000); + assertNotNull("Should get recovered retained message from " + FORWARD_TOPIC, message); + bs = message.getContent(); + assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length)); + assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)); + assertNull("Should not get second retained message from " + FORWARD_TOPIC, topicConsumer.receive(5000)); + + // create second queue consumer and verify that it doesn't trigger message recovery + final MessageConsumer queueConsumer2 = s.createConsumer(jmsQueue); + assertNull("Second consumer MUST not receive retained message from " + FORWARD_QUEUE, queueConsumer2.receive(5000)); + + activeMQConnection.close(); + provider.disconnect(); + } +} diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index bf81e98a4f..3fe3409efb 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -41,10 +41,12 @@ import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -1426,6 +1428,55 @@ public class MQTTTest extends MQTTTestSupport { assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]); } + @Test(timeout = 60 * 1000) + public void testSendMQTTReceiveJMSVirtualTopic() throws Exception { + + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + final String DESTINATION_NAME = "Consumer.jms.VirtualTopic.TopicA"; + + // send retained message + final String RETAINED = "RETAINED"; + final String MQTT_DESTINATION_NAME = "VirtualTopic/TopicA"; + provider.publish(MQTT_DESTINATION_NAME, RETAINED.getBytes(), AT_LEAST_ONCE, true); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(jmsUri).createConnection(); + // MUST set to true to receive retained messages + activeMQConnection.setUseRetroactiveConsumer(true); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue jmsQueue = s.createQueue(DESTINATION_NAME); + MessageConsumer consumer = s.createConsumer(jmsQueue); + + // check whether we received retained message on JMS subscribe + ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000); + assertNotNull("Should get retained message", message); + ByteSequence bs = message.getContent(); + assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length)); + assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String payload = "Test Message: " + i; + provider.publish(MQTT_DESTINATION_NAME, payload.getBytes(), AT_LEAST_ONCE); + message = (ActiveMQMessage) consumer.receive(5000); + assertNotNull("Should get a message", message); + bs = message.getContent(); + assertEquals(payload, new String(bs.data, bs.offset, bs.length)); + } + + // re-create consumer and check we received retained message again + consumer.close(); + consumer = s.createConsumer(jmsQueue); + message = (ActiveMQMessage) consumer.receive(5000); + assertNotNull("Should get retained message", message); + bs = message.getContent(); + assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length)); + assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)); + + activeMQConnection.close(); + provider.disconnect(); + } + @Test(timeout = 60 * 1000) public void testPingOnMQTT() throws Exception { stopBroker();