diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 1acd524664..1508c619ec 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -17,6 +17,7 @@ package org.apache.activemq.advisory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -42,6 +43,7 @@ import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualTopic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -58,6 +60,7 @@ import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.SessionId; +import org.apache.activemq.filter.DestinationPath; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; @@ -604,7 +607,7 @@ public class AdvisoryBroker extends BrokerFilter { if(brokerConsumerDests.putIfAbsent(pair, info) == null) { LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info); - info.setDestination(virtualDestination.getVirtualDestination()); + setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest); ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { @@ -616,7 +619,7 @@ public class AdvisoryBroker extends BrokerFilter { //this is the case of a real consumer coming online } else { info = info.copy(); - info.setDestination(virtualDestination.getVirtualDestination()); + setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest); ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { @@ -626,6 +629,45 @@ public class AdvisoryBroker extends BrokerFilter { } } + /** + * Sets the virtual destination on the ConsumerInfo + * If this is a VirtualTopic then the destination used will be the actual topic subscribed + * to in order to track demand properly + * + * @param info + * @param virtualDestination + * @param activeMQDest + */ + private void setConsumerInfoVirtualDest(ConsumerInfo info, VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) { + info.setDestination(virtualDestination.getVirtualDestination()); + if (virtualDestination instanceof VirtualTopic) { + VirtualTopic vt = (VirtualTopic) virtualDestination; + String prefix = vt.getPrefix() != null ? vt.getPrefix() : ""; + String postfix = vt.getPostfix() != null ? vt.getPostfix() : ""; + if (prefix.endsWith(".")) { + prefix = prefix.substring(0, prefix.length() - 1); + } + if (postfix.startsWith(".")) { + postfix = postfix.substring(1, postfix.length()); + } + ActiveMQDestination prefixDestination = prefix.length() > 0 ? new ActiveMQTopic(prefix) : null; + ActiveMQDestination postfixDestination = postfix.length() > 0 ? new ActiveMQTopic(postfix) : null; + + String[] prefixPaths = prefixDestination != null ? prefixDestination.getDestinationPaths() : new String[] {}; + String[] activeMQDestPaths = activeMQDest.getDestinationPaths(); + String[] postfixPaths = postfixDestination != null ? postfixDestination.getDestinationPaths() : new String[] {}; + + //sanity check + if (activeMQDestPaths.length > prefixPaths.length + postfixPaths.length) { + String[] topicPath = Arrays.copyOfRange(activeMQDestPaths, 0 + prefixPaths.length, + activeMQDestPaths.length - postfixPaths.length); + + ActiveMQTopic newTopic = new ActiveMQTopic(DestinationPath.toString(topicPath)); + info.setDestination(newTopic); + } + } + } + @Override public void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java index 782f53f36e..af5c3167e2 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java @@ -118,31 +118,49 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport { * @throws Exception */ @Test(timeout = 60 * 1000) - public void testVirtualTopic() throws Exception { + public void testVirtualTopics() throws Exception { Assume.assumeTrue(isUseVirtualDestSubsOnCreation); doSetUp(true, null); MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>"); MessageProducer includedProducer = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar")); + MessageProducer includedProducer2 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar2")); + MessageProducer includedProducer3 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar3")); Thread.sleep(2000); Message test = localSession.createTextMessage("test"); final DestinationStatistics destinationStatistics = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics(); + final DestinationStatistics destinationStatistics2 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar2")).getDestinationStatistics(); + + //No queue destination on the remote side so should not forward + final DestinationStatistics destinationStatistics3 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar3")).getDestinationStatistics(); //this will create the destination so messages accumulate final DestinationStatistics remoteStats = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")).getDestinationStatistics(); + final DestinationStatistics remoteStats2 = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar2")).getDestinationStatistics(); + waitForConsumerCount(destinationStatistics, 1); + waitForConsumerCount(destinationStatistics2, 1); includedProducer.send(test); + includedProducer2.send(localSession.createTextMessage("test2")); + includedProducer3.send(localSession.createTextMessage("test3")); //assert statistics waitForDispatchFromLocalBroker(destinationStatistics, 1); + waitForDispatchFromLocalBroker(destinationStatistics2, 1); assertLocalBrokerStatistics(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics2, 1); assertEquals("remote dest messages", 1, remoteStats.getMessages().getCount()); + assertEquals("remote dest messages", 1, remoteStats2.getMessages().getCount()); - assertRemoteAdvisoryCount(advisoryConsumer, 1); - assertAdvisoryBrokerCounts(1,1,1); + assertRemoteAdvisoryCount(advisoryConsumer, 2); + assertAdvisoryBrokerCounts(1,2,2); + + //should not have forwarded for 3rd topic + Thread.sleep(1000); + assertEquals("local broker dest stat dispatched", 0, destinationStatistics3.getDispatched().getCount()); }