test case for AMQ-4356 validates this

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1489471 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-06-04 15:27:18 +00:00
parent b8a7e9990b
commit ba37cbb1de
3 changed files with 15 additions and 10 deletions

View File

@ -33,6 +33,7 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.SubscriptionKey;
/** /**
* *
@ -386,4 +387,10 @@ public class DestinationFilter implements Destination {
public boolean isDLQ() { public boolean isDLQ() {
return next.isDLQ(); return next.isDLQ();
} }
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof Topic) {
((Topic) next).deleteSubscription(context, key);
}
}
} }

View File

@ -31,7 +31,6 @@ import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
@ -198,9 +197,9 @@ public class TopicRegion extends AbstractRegion {
if (dest instanceof Topic){ if (dest instanceof Topic){
Topic topic = (Topic)dest; Topic topic = (Topic)dest;
topic.deleteSubscription(context, key); topic.deleteSubscription(context, key);
} else if (dest instanceof VirtualTopicInterceptor) { } else if (dest instanceof DestinationFilter) {
VirtualTopicInterceptor virtualTopic = (VirtualTopicInterceptor) dest; DestinationFilter filter = (DestinationFilter) dest;
virtualTopic.getTopic().deleteSubscription(context, key); filter.deleteSubscription(context, key);
} }
} }
} finally { } finally {

View File

@ -26,15 +26,14 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.util.LRUCache; import org.apache.activemq.util.LRUCache;
/** /**
* A Destination which implements <a * A Destination which implements <a href="http://activemq.org/site/virtual-destinations.html">Virtual Topic</a>
* href="http://activemq.org/site/virtual-destinations.html">Virtual Topic</a>
*/ */
public class VirtualTopicInterceptor extends DestinationFilter { public class VirtualTopicInterceptor extends DestinationFilter {
private final String prefix; private final String prefix;
private final String postfix; private final String postfix;
private final boolean local; private final boolean local;
private final LRUCache<ActiveMQDestination,ActiveMQQueue> cache = new LRUCache<ActiveMQDestination,ActiveMQQueue>(); private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
super(next); super(next);
@ -58,11 +57,11 @@ public class VirtualTopicInterceptor extends DestinationFilter {
protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) { protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
ActiveMQQueue queue; ActiveMQQueue queue;
synchronized(cache){ synchronized (cache) {
queue = cache.get(original); queue = cache.get(original);
if (queue==null){ if (queue == null) {
queue = new ActiveMQQueue(prefix + original.getPhysicalName() + postfix); queue = new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
cache.put(original,queue); cache.put(original, queue);
} }
} }
return queue; return queue;