mirror of https://github.com/apache/activemq.git
AMQ-6875 - Use the correct destination for Virtual destination consumers
when using Virtual Topics
This commit is contained in:
parent
d3e4393784
commit
56baba96c6
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.advisory;
|
package org.apache.activemq.advisory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -42,6 +43,7 @@ import org.apache.activemq.broker.region.Subscription;
|
||||||
import org.apache.activemq.broker.region.TopicRegion;
|
import org.apache.activemq.broker.region.TopicRegion;
|
||||||
import org.apache.activemq.broker.region.TopicSubscription;
|
import org.apache.activemq.broker.region.TopicSubscription;
|
||||||
import org.apache.activemq.broker.region.virtual.VirtualDestination;
|
import org.apache.activemq.broker.region.virtual.VirtualDestination;
|
||||||
|
import org.apache.activemq.broker.region.virtual.VirtualTopic;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
@ -58,6 +60,7 @@ import org.apache.activemq.command.ProducerId;
|
||||||
import org.apache.activemq.command.ProducerInfo;
|
import org.apache.activemq.command.ProducerInfo;
|
||||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||||
import org.apache.activemq.command.SessionId;
|
import org.apache.activemq.command.SessionId;
|
||||||
|
import org.apache.activemq.filter.DestinationPath;
|
||||||
import org.apache.activemq.security.SecurityContext;
|
import org.apache.activemq.security.SecurityContext;
|
||||||
import org.apache.activemq.state.ProducerState;
|
import org.apache.activemq.state.ProducerState;
|
||||||
import org.apache.activemq.usage.Usage;
|
import org.apache.activemq.usage.Usage;
|
||||||
|
@ -604,7 +607,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
|
|
||||||
if(brokerConsumerDests.putIfAbsent(pair, info) == null) {
|
if(brokerConsumerDests.putIfAbsent(pair, info) == null) {
|
||||||
LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info);
|
LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info);
|
||||||
info.setDestination(virtualDestination.getVirtualDestination());
|
setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
|
||||||
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
|
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
|
||||||
|
|
||||||
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
|
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
|
||||||
|
@ -616,7 +619,7 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
//this is the case of a real consumer coming online
|
//this is the case of a real consumer coming online
|
||||||
} else {
|
} else {
|
||||||
info = info.copy();
|
info = info.copy();
|
||||||
info.setDestination(virtualDestination.getVirtualDestination());
|
setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
|
||||||
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
|
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
|
||||||
|
|
||||||
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
|
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
|
||||||
|
@ -626,6 +629,45 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the virtual destination on the ConsumerInfo
|
||||||
|
* If this is a VirtualTopic then the destination used will be the actual topic subscribed
|
||||||
|
* to in order to track demand properly
|
||||||
|
*
|
||||||
|
* @param info
|
||||||
|
* @param virtualDestination
|
||||||
|
* @param activeMQDest
|
||||||
|
*/
|
||||||
|
private void setConsumerInfoVirtualDest(ConsumerInfo info, VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) {
|
||||||
|
info.setDestination(virtualDestination.getVirtualDestination());
|
||||||
|
if (virtualDestination instanceof VirtualTopic) {
|
||||||
|
VirtualTopic vt = (VirtualTopic) virtualDestination;
|
||||||
|
String prefix = vt.getPrefix() != null ? vt.getPrefix() : "";
|
||||||
|
String postfix = vt.getPostfix() != null ? vt.getPostfix() : "";
|
||||||
|
if (prefix.endsWith(".")) {
|
||||||
|
prefix = prefix.substring(0, prefix.length() - 1);
|
||||||
|
}
|
||||||
|
if (postfix.startsWith(".")) {
|
||||||
|
postfix = postfix.substring(1, postfix.length());
|
||||||
|
}
|
||||||
|
ActiveMQDestination prefixDestination = prefix.length() > 0 ? new ActiveMQTopic(prefix) : null;
|
||||||
|
ActiveMQDestination postfixDestination = postfix.length() > 0 ? new ActiveMQTopic(postfix) : null;
|
||||||
|
|
||||||
|
String[] prefixPaths = prefixDestination != null ? prefixDestination.getDestinationPaths() : new String[] {};
|
||||||
|
String[] activeMQDestPaths = activeMQDest.getDestinationPaths();
|
||||||
|
String[] postfixPaths = postfixDestination != null ? postfixDestination.getDestinationPaths() : new String[] {};
|
||||||
|
|
||||||
|
//sanity check
|
||||||
|
if (activeMQDestPaths.length > prefixPaths.length + postfixPaths.length) {
|
||||||
|
String[] topicPath = Arrays.copyOfRange(activeMQDestPaths, 0 + prefixPaths.length,
|
||||||
|
activeMQDestPaths.length - postfixPaths.length);
|
||||||
|
|
||||||
|
ActiveMQTopic newTopic = new ActiveMQTopic(DestinationPath.toString(topicPath));
|
||||||
|
info.setDestination(newTopic);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void virtualDestinationRemoved(ConnectionContext context,
|
public void virtualDestinationRemoved(ConnectionContext context,
|
||||||
VirtualDestination virtualDestination) {
|
VirtualDestination virtualDestination) {
|
||||||
|
|
|
@ -118,31 +118,49 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
public void testVirtualTopic() throws Exception {
|
public void testVirtualTopics() throws Exception {
|
||||||
Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
|
Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
|
||||||
doSetUp(true, null);
|
doSetUp(true, null);
|
||||||
|
|
||||||
MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>");
|
MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>");
|
||||||
|
|
||||||
MessageProducer includedProducer = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar"));
|
MessageProducer includedProducer = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar"));
|
||||||
|
MessageProducer includedProducer2 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar2"));
|
||||||
|
MessageProducer includedProducer3 = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar3"));
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
Message test = localSession.createTextMessage("test");
|
Message test = localSession.createTextMessage("test");
|
||||||
|
|
||||||
final DestinationStatistics destinationStatistics = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics();
|
final DestinationStatistics destinationStatistics = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics();
|
||||||
|
final DestinationStatistics destinationStatistics2 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar2")).getDestinationStatistics();
|
||||||
|
|
||||||
|
//No queue destination on the remote side so should not forward
|
||||||
|
final DestinationStatistics destinationStatistics3 = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar3")).getDestinationStatistics();
|
||||||
|
|
||||||
//this will create the destination so messages accumulate
|
//this will create the destination so messages accumulate
|
||||||
final DestinationStatistics remoteStats = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")).getDestinationStatistics();
|
final DestinationStatistics remoteStats = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")).getDestinationStatistics();
|
||||||
|
final DestinationStatistics remoteStats2 = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar2")).getDestinationStatistics();
|
||||||
|
|
||||||
waitForConsumerCount(destinationStatistics, 1);
|
waitForConsumerCount(destinationStatistics, 1);
|
||||||
|
waitForConsumerCount(destinationStatistics2, 1);
|
||||||
|
|
||||||
includedProducer.send(test);
|
includedProducer.send(test);
|
||||||
|
includedProducer2.send(localSession.createTextMessage("test2"));
|
||||||
|
includedProducer3.send(localSession.createTextMessage("test3"));
|
||||||
|
|
||||||
//assert statistics
|
//assert statistics
|
||||||
waitForDispatchFromLocalBroker(destinationStatistics, 1);
|
waitForDispatchFromLocalBroker(destinationStatistics, 1);
|
||||||
|
waitForDispatchFromLocalBroker(destinationStatistics2, 1);
|
||||||
assertLocalBrokerStatistics(destinationStatistics, 1);
|
assertLocalBrokerStatistics(destinationStatistics, 1);
|
||||||
|
assertLocalBrokerStatistics(destinationStatistics2, 1);
|
||||||
assertEquals("remote dest messages", 1, remoteStats.getMessages().getCount());
|
assertEquals("remote dest messages", 1, remoteStats.getMessages().getCount());
|
||||||
|
assertEquals("remote dest messages", 1, remoteStats2.getMessages().getCount());
|
||||||
|
|
||||||
assertRemoteAdvisoryCount(advisoryConsumer, 1);
|
assertRemoteAdvisoryCount(advisoryConsumer, 2);
|
||||||
assertAdvisoryBrokerCounts(1,1,1);
|
assertAdvisoryBrokerCounts(1,2,2);
|
||||||
|
|
||||||
|
//should not have forwarded for 3rd topic
|
||||||
|
Thread.sleep(1000);
|
||||||
|
assertEquals("local broker dest stat dispatched", 0, destinationStatistics3.getDispatched().getCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue