From 25703fbd1f27b65a7410acd7df0bfaf7c16845d8 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Tue, 13 Dec 2016 11:58:57 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6538 Fixing an issue with syncDurableSubs that cause a bridge failure when adding multiple bridges between the same brokers --- .../network/DurableConduitBridge.java | 20 ++++--- .../network/DurableSyncNetworkBridgeTest.java | 55 ++++++++++++++++++- 2 files changed, 67 insertions(+), 8 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java index 50c9855d85..fb2b6c915e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java @@ -18,6 +18,7 @@ package org.apache.activemq.network; import java.io.IOException; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; @@ -95,14 +96,19 @@ public class DurableConduitBridge extends ConduitBridge { String candidateSubName = getSubscriberName(dest); for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { String subName = subscription.getConsumerInfo().getSubscriptionName(); - if (subName != null && subName.equals(candidateSubName)) { + if (subName != null && subName.equals(candidateSubName) && + subscription instanceof DurableTopicSubscription) { try { - // remove the NC subscription as it is no longer for a permissable dest - RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); - sending.setClientId(localClientId); - sending.setSubscriptionName(subName); - sending.setConnectionId(this.localConnectionInfo.getConnectionId()); - localBroker.oneway(sending); + DurableTopicSubscription durableSub = (DurableTopicSubscription) subscription; + //check the clientId so we only remove subs for the matching bridge + if (durableSub.getSubscriptionKey().getClientId().equals(localClientId)) { + // remove the NC subscription as it is no longer for a permissible dest + RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); + sending.setClientId(localClientId); + sending.setSubscriptionName(subName); + sending.setConnectionId(this.localConnectionInfo.getConnectionId()); + localBroker.oneway(sending); + } } catch (IOException e) { LOG.debug("Exception removing NC durable subscription: {}", subName, e); serviceRemoteException(e); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index 4a705f3e08..4e115a4142 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -76,7 +76,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { private boolean forceDurable = false; private boolean useVirtualDestSubs = false; private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION; - public static enum FLOW {FORWARD, REVERSE}; + public static enum FLOW {FORWARD, REVERSE} private BrokerService broker1; private BrokerService broker2; @@ -535,6 +535,59 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { } + //Test that durable sync works with more than one bridge + @Test + public void testAddOnlineSubscriptionsTwoBridges() throws Exception { + + final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); + final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName); + final ActiveMQTopic topic2 = new ActiveMQTopic("include.new.topic"); + + assertSubscriptionsCount(broker1, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); + + //create durable that shouldn't be propagated + session1.createDurableSubscriber(excludeTopic, "sub-exclude"); + + //Add 3 online subs + session1.createDurableSubscriber(topic, subName); + session1.createDurableSubscriber(topic, "sub2"); + session1.createDurableSubscriber(topic, "sub3"); + //Add sub on second topic/bridge + session1.createDurableSubscriber(topic2, "secondTopicSubName"); + assertSubscriptionsCount(broker1, topic, 3); + assertSubscriptionsCount(broker1, topic2, 1); + + //Add the second network connector + NetworkConnector secondConnector = configureLocalNetworkConnector(); + secondConnector.setName("networkConnector2"); + secondConnector.setDynamicallyIncludedDestinations( + Lists.newArrayList( + new ActiveMQTopic("include.new.topic?forceDurable=" + forceDurable))); + localBroker.addNetworkConnector(secondConnector); + secondConnector.start(); + + //Make sure both bridges are connected + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 && + localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1; + } + }, 10000, 500)); + + //Make sure NC durables exist for both bridges + assertNCDurableSubsCount(broker2, topic2, 1); + assertNCDurableSubsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, excludeTopic, 0); + + //Make sure message can reach remote broker + MessageProducer producer = session2.createProducer(topic2); + producer.send(session2.createTextMessage("test")); + waitForDispatchFromLocalBroker(broker2.getDestination(topic2).getDestinationStatistics(), 1); + assertLocalBrokerStatistics(broker2.getDestination(topic2).getDestinationStatistics(), 1); + } + @Test(timeout = 60 * 1000) public void testVirtualDestSubForceDurableSync() throws Exception { Assume.assumeTrue(flow == FLOW.FORWARD);