https://issues.apache.org/jira/browse/AMQ-5639 - the duplex case needed work. All advisories were being acked async in duplex mode, that code needed to be more selective to forward advisories that dont terminate at the bridge. Fix and test

This commit is contained in:
gtully 2015-06-26 14:54:11 +01:00
parent 13c471cc11
commit 002ade79b0
3 changed files with 81 additions and 5 deletions

View File

@ -619,8 +619,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
if (command.isMessage()) {
final ActiveMQMessage message = (ActiveMQMessage) command;
if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
|| AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
serviceRemoteConsumerAdvisory(message.getDataStructure());
ackAdvisory(message);
} else {
@ -989,7 +988,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), message
});
if (isDuplex() && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
try {
// never request b/c they are eventually acked async
remoteBroker.oneway(message);

View File

@ -97,7 +97,7 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
}
if (message.isAdvisory()) {
if (consumerInfo != null && consumerInfo.isNetworkSubscription() && advisoryIsInterpretedByNetworkBridge(message)) {
if (consumerInfo != null && consumerInfo.isNetworkSubscription() && isAdvisoryInterpretedByNetworkBridge(message)) {
// they will be interpreted by the bridge leading to dup commands
if (LOG.isTraceEnabled()) {
LOG.trace("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message);
@ -124,7 +124,7 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
return true;
}
private boolean advisoryIsInterpretedByNetworkBridge(Message message) {
public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) {
return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination());
}

View File

@ -17,10 +17,16 @@
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Arrays;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.network.NetworkConnector;
@ -71,6 +77,33 @@ public class AdvisoryViaNetworkTest extends JmsMultipleBrokersTestSupport {
}
public void testAdvisoryForwardingDuplexNC() throws Exception {
ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO");
createBroker("A");
createBroker("B");
NetworkConnector networkBridge = bridgeBrokers("A", "B");
networkBridge.addStaticallyIncludedDestination(advisoryTopic);
networkBridge.setDuplex(true);
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
MessageConsumer consumerA = createConsumer("A", advisoryTopic);
MessageConsumer consumerB = createConsumer("B", advisoryTopic);
this.sendMessages("A", new ActiveMQTopic("FOO"), 1);
MessageIdList messagesA = getConsumerMessages("A", consumerA);
MessageIdList messagesB = getConsumerMessages("B", consumerB);
LOG.info("consumerA = " + messagesA);
LOG.info("consumerB = " + messagesB);
messagesA.assertMessagesReceived(2);
messagesB.assertMessagesReceived(2);
}
public void testBridgeRelevantAdvisoryNotAvailable() throws Exception {
ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.FOO");
createBroker("A");
@ -97,6 +130,50 @@ public class AdvisoryViaNetworkTest extends JmsMultipleBrokersTestSupport {
messagesB.assertMessagesReceived(0);
}
public void testAdvisoryViaVirtualDest() throws Exception {
ActiveMQQueue advisoryQueue = new ActiveMQQueue("advQ");
createBroker("A");
// convert advisories into advQ that cross the network bridge
CompositeTopic compositeTopic = new CompositeTopic();
compositeTopic.setName("ActiveMQ.Advisory.Connection");
compositeTopic.setForwardOnly(false);
compositeTopic.setForwardTo(Arrays.asList(advisoryQueue));
VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{compositeTopic});
brokers.get("A").broker.setDestinationInterceptors(new DestinationInterceptor[]{virtualDestinationInterceptor});
createBroker("B");
NetworkConnector networkBridge = bridgeBrokers("A", "B");
networkBridge.setDuplex(true);
networkBridge.setPrefetchSize(1); // so advisories are acked immediately b/c we check inflight count below
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
verifyPeerBrokerInfo(brokers.get("B"), 1);
MessageConsumer consumerB = createConsumer("B", advisoryQueue);
// to make a connection on A
createConsumer("A", new ActiveMQTopic("FOO"));
MessageIdList messagesB = getConsumerMessages("B", consumerB);
messagesB.waitForMessagesToArrive(2);
assertTrue("deq and inflight as expected", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
RegionBroker regionBroker = (RegionBroker) brokers.get("A").broker.getRegionBroker();
LOG.info("A Deq:" + regionBroker.getDestinationStatistics().getDequeues().getCount());
LOG.info("A Inflight:" + regionBroker.getDestinationStatistics().getInflight().getCount());
return regionBroker.getDestinationStatistics().getDequeues().getCount() > 2
&& regionBroker.getDestinationStatistics().getInflight().getCount() == 0;
}
}));
}
private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception {
final BrokerService broker = brokerItem.broker;
final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();