Fix the NPE case
This commit is contained in:
Timothy Bish 2014-07-31 17:23:35 -04:00
parent 47ebd80b6b
commit 5de0c8e2fb
2 changed files with 9 additions and 6 deletions

View File

@ -40,6 +40,7 @@ import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor;
import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.ActiveMQMapMessage;
@ -500,7 +501,11 @@ public class MQTTProtocolConverter {
for (Subscription subscription : dest.getConsumers()) { for (Subscription subscription : dest.getConsumers()) {
if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) { if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
try { try {
((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription); if (dest instanceof org.apache.activemq.broker.region.Topic) {
((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
} else if (dest instanceof VirtualTopicInterceptor) {
((VirtualTopicInterceptor)dest).getTopic().recoverRetroactiveMessages(connectionContext, subscription);
}
if (subscription instanceof PrefetchSubscription) { if (subscription instanceof PrefetchSubscription) {
// request dispatch for prefetch subs // request dispatch for prefetch subs
PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription; PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription;
@ -917,7 +922,6 @@ public class MQTTProtocolConverter {
case '>': case '>':
chars[i] = '#'; chars[i] = '#';
break; break;
case '+': case '+':
chars[i] = '*'; chars[i] = '*';
break; break;

View File

@ -518,14 +518,14 @@ public class MQTTTest extends MQTTTestSupport {
assertEquals(RETAIN, new String(msg.getPayload())); assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack(); msg.ack();
assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
connection.unsubscribe(new String[]{"TopicA"}); connection.unsubscribe(new String[]{TOPICA});
// clear retained message and check that we don't receive it // clear retained message and check that we don't receive it
connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true); connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(500, TimeUnit.MILLISECONDS); msg = connection.receive(500, TimeUnit.MILLISECONDS);
assertNull("Retained message not cleared for " + clientId, msg); assertNull("Retained message not cleared for " + clientId, msg);
connection.unsubscribe(new String[]{"TopicA"}); connection.unsubscribe(new String[]{TOPICA});
// set retained message again and check // set retained message again and check
connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true); connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
@ -547,8 +547,7 @@ public class MQTTTest extends MQTTTestSupport {
msg.ack(); msg.ack();
assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
connection.unsubscribe(new String[]{"TopicA"}); connection.unsubscribe(new String[]{TOPICA});
connection.disconnect(); connection.disconnect();
} }
} }