diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index eb5bb2b7cf..4ca23c370f 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -40,6 +40,7 @@ import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMapMessage; @@ -500,7 +501,11 @@ public class MQTTProtocolConverter { for (Subscription subscription : dest.getConsumers()) { if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) { try { - ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription); + if (dest instanceof org.apache.activemq.broker.region.Topic) { + ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription); + } else if (dest instanceof VirtualTopicInterceptor) { + ((VirtualTopicInterceptor)dest).getTopic().recoverRetroactiveMessages(connectionContext, subscription); + } if (subscription instanceof PrefetchSubscription) { // request dispatch for prefetch subs PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription; @@ -917,7 +922,6 @@ public class MQTTProtocolConverter { case '>': chars[i] = '#'; break; - case '+': chars[i] = '*'; break; diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index e332c48e23..c4571dc500 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -518,14 +518,14 @@ public class MQTTTest extends MQTTTestSupport { assertEquals(RETAIN, new String(msg.getPayload())); msg.ack(); assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); - connection.unsubscribe(new String[]{"TopicA"}); + connection.unsubscribe(new String[]{TOPICA}); // clear retained message and check that we don't receive it connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true); connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); msg = connection.receive(500, TimeUnit.MILLISECONDS); assertNull("Retained message not cleared for " + clientId, msg); - connection.unsubscribe(new String[]{"TopicA"}); + connection.unsubscribe(new String[]{TOPICA}); // set retained message again and check connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true); @@ -547,8 +547,7 @@ public class MQTTTest extends MQTTTestSupport { msg.ack(); assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); - connection.unsubscribe(new String[]{"TopicA"}); - + connection.unsubscribe(new String[]{TOPICA}); connection.disconnect(); } }