AMQ-6875 - Use the correct destination for Virtual destination consumers

when using Virtual Topics

(cherry picked from commit 56baba96c6)
This commit is contained in:
Christopher L. Shannon (cshannon) 2017-12-14 16:00:37 -05:00
parent c5d8a98106
commit 94aea677ac
2 changed files with 65 additions and 5 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.advisory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@ -42,6 +43,7 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@ -58,6 +60,7 @@ import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.filter.DestinationPath;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
@ -604,7 +607,7 @@ public class AdvisoryBroker extends BrokerFilter {
if(brokerConsumerDests.putIfAbsent(pair, info) == null) {
LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info);
info.setDestination(virtualDestination.getVirtualDestination());
setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
@ -616,7 +619,7 @@ public class AdvisoryBroker extends BrokerFilter {
//this is the case of a real consumer coming online
} else {
info = info.copy();
info.setDestination(virtualDestination.getVirtualDestination());
setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
@ -626,6 +629,45 @@ public class AdvisoryBroker extends BrokerFilter {
}
}
/**
* Sets the virtual destination on the ConsumerInfo
* If this is a VirtualTopic then the destination used will be the actual topic subscribed
* to in order to track demand properly
*
* @param info
* @param virtualDestination
* @param activeMQDest
*/
private void setConsumerInfoVirtualDest(ConsumerInfo info, VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) {
info.setDestination(virtualDestination.getVirtualDestination());
if (virtualDestination instanceof VirtualTopic) {
VirtualTopic vt = (VirtualTopic) virtualDestination;
String prefix = vt.getPrefix() != null ? vt.getPrefix() : "";
String postfix = vt.getPostfix() != null ? vt.getPostfix() : "";
if (prefix.endsWith(".")) {
prefix = prefix.substring(0, prefix.length() - 1);
}
if (postfix.startsWith(".")) {
postfix = postfix.substring(1, postfix.length());
}
ActiveMQDestination prefixDestination = prefix.length() > 0 ? new ActiveMQTopic(prefix) : null;
ActiveMQDestination postfixDestination = postfix.length() > 0 ? new ActiveMQTopic(postfix) : null;
String[] prefixPaths = prefixDestination != null ? prefixDestination.getDestinationPaths() : new String[] {};
String[] activeMQDestPaths = activeMQDest.getDestinationPaths();
String[] postfixPaths = postfixDestination != null ? postfixDestination.getDestinationPaths() : new String[] {};
//sanity check
if (activeMQDestPaths.length > prefixPaths.length + postfixPaths.length) {
String[] topicPath = Arrays.copyOfRange(activeMQDestPaths, 0 + prefixPaths.length,
activeMQDestPaths.length - postfixPaths.length);
ActiveMQTopic newTopic = new ActiveMQTopic(DestinationPath.toString(topicPath));
info.setDestination(newTopic);
}
}
}
@Override
public void virtualDestinationRemoved(ConnectionContext context,
VirtualDestination virtualDestination) {

View File

@ -118,31 +118,49 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
* @throws Exception
*/
@Test(timeout = 60 * 1000)
public void testVirtualTopic() throws Exception {
public void testVirtualTopics() throws Exception {
Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
doSetUp(true, null);
MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>");
MessageProducer includedProducer = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar"));
MessageProducer includedProducer2 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar2"));
MessageProducer includedProducer3 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar3"));
Thread.sleep(2000);
Message test = localSession.createTextMessage("test");
final DestinationStatistics destinationStatistics = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics();
final DestinationStatistics destinationStatistics2 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar2")).getDestinationStatistics();
//No queue destination on the remote side so should not forward
final DestinationStatistics destinationStatistics3 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar3")).getDestinationStatistics();
//this will create the destination so messages accumulate
final DestinationStatistics remoteStats = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")).getDestinationStatistics();
final DestinationStatistics remoteStats2 = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar2")).getDestinationStatistics();
waitForConsumerCount(destinationStatistics, 1);
waitForConsumerCount(destinationStatistics2, 1);
includedProducer.send(test);
includedProducer2.send(localSession.createTextMessage("test2"));
includedProducer3.send(localSession.createTextMessage("test3"));
//assert statistics
waitForDispatchFromLocalBroker(destinationStatistics, 1);
waitForDispatchFromLocalBroker(destinationStatistics2, 1);
assertLocalBrokerStatistics(destinationStatistics, 1);
assertLocalBrokerStatistics(destinationStatistics2, 1);
assertEquals("remote dest messages", 1, remoteStats.getMessages().getCount());
assertEquals("remote dest messages", 1, remoteStats2.getMessages().getCount());
assertRemoteAdvisoryCount(advisoryConsumer, 1);
assertAdvisoryBrokerCounts(1,1,1);
assertRemoteAdvisoryCount(advisoryConsumer, 2);
assertAdvisoryBrokerCounts(1,2,2);
//should not have forwarded for 3rd topic
Thread.sleep(1000);
assertEquals("local broker dest stat dispatched", 0, destinationStatistics3.getDispatched().getCount());
}