diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 8a3a56a59c..70449f0086 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -652,12 +652,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br this.brokerService.getBrokerName(), subInfo.getBrokerName()); if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions() - && !configuration.isDynamicOnly() && subInfo.getSubscriptionInfos() != null) { + && !configuration.isDynamicOnly()) { if (started.get()) { - for (ConsumerInfo info : subInfo.getSubscriptionInfos()) { - if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) && - matchesDynamicallyIncludedDestinations(info.getDestination())) { - serviceRemoteConsumerAdvisory(info); + if (subInfo.getSubscriptionInfos() != null) { + for (ConsumerInfo info : subInfo.getSubscriptionInfos()) { + if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) && + matchesDynamicallyIncludedDestinations(info.getDestination())) { + serviceRemoteConsumerAdvisory(info); + } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java index e699272c13..969c386528 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java @@ -26,6 +26,7 @@ import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.transport.Transport; import org.apache.activemq.util.TypeConversionSupport; @@ -88,6 +89,30 @@ public class DurableConduitBridge extends ConduitBridge { LOG.error("Failed to add static destination {}", dest, e); } LOG.trace("Forwarding messages for durable destination: {}", dest); + } else if (configuration.isSyncDurableSubs() && !isPermissableDestination(dest)) { + if (dest.isTopic()) { + RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); + TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); + + String candidateSubName = getSubscriberName(dest); + for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { + String subName = subscription.getConsumerInfo().getSubscriptionName(); + if (subName != null && subName.equals(candidateSubName)) { + try { + // remove the NC subscription as it is no longer for a permissable dest + RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); + sending.setClientId(localClientId); + sending.setSubscriptionName(subName); + sending.setConnectionId(this.localConnectionInfo.getConnectionId()); + localBroker.oneway(sending); + } catch (IOException e) { + LOG.debug("Exception removing NC durable subscription: {}", subName, e); + serviceRemoteException(e); + } + break; + } + } + } } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index 67e9e24abf..62b3dec036 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; @@ -56,14 +57,18 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class); + protected String staticIncludeTopics = "include.static.test"; + protected String includedTopics = "include.test.>"; protected String testTopicName2 = "include.test.bar2"; private boolean dynamicOnly = false; + private boolean forceDurable = false; private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION; public static enum FLOW {FORWARD, REVERSE}; private BrokerService broker1; private BrokerService broker2; private Session session1; + private Session session2; private final FLOW flow; @Rule @@ -98,7 +103,10 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { @Before public void setUp() throws Exception { + includedTopics = "include.test.>"; + staticIncludeTopics = "include.static.test"; dynamicOnly = false; + forceDurable = false; remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION; doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder()); } @@ -135,6 +143,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { assertNCDurableSubsCount(broker2, topic, 1); restartBrokers(true); + assertBridgeStarted(); assertSubscriptionsCount(broker1, topic, 1); assertNCDurableSubsCount(broker2, topic, 1); @@ -157,6 +166,43 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { doTearDown(); restartBroker(broker1, false); + restartBroker(broker2, false); + + //Send some messages to the NC sub and make sure it can still be deleted + MessageProducer prod = session2.createProducer(topic); + for (int i = 0; i < 10; i++) { + prod.send(session2.createTextMessage("test")); + } + + assertSubscriptionsCount(broker1, topic, 1); + removeSubscription(broker1, topic, subName); + assertSubscriptionsCount(broker1, topic, 0); + doTearDown(); + + //Test that on successful reconnection of the bridge that + //the NC sub will be removed + restartBroker(broker2, true); + assertNCDurableSubsCount(broker2, topic, 1); + restartBroker(broker1, true); + assertBridgeStarted(); + assertNCDurableSubsCount(broker2, topic, 0); + + } + + @Test + public void testRemoveSubscriptionWithBridgeOfflineIncludedChanged() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); + MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName); + sub1.close(); + + assertSubscriptionsCount(broker1, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); + + doTearDown(); + + //change the included topics to make sure we still cleanup non-matching NC durables + includedTopics = "different.topic"; + restartBroker(broker1, false); assertSubscriptionsCount(broker1, topic, 1); removeSubscription(broker1, topic, subName); assertSubscriptionsCount(broker1, topic, 0); @@ -166,10 +212,76 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { restartBroker(broker2, true); assertNCDurableSubsCount(broker2, topic, 1); restartBroker(broker1, true); + assertBridgeStarted(); assertNCDurableSubsCount(broker2, topic, 0); } + @Test + public void testSubscriptionRemovedAfterIncludedChanged() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); + MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName); + sub1.close(); + + assertSubscriptionsCount(broker1, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); + + doTearDown(); + + //change the included topics to make sure we still cleanup non-matching NC durables + includedTopics = "different.topic"; + restartBroker(broker1, false); + assertSubscriptionsCount(broker1, topic, 1); + + //Test that on successful reconnection of the bridge that + //the NC sub will be removed because even though the local subscription exists, + //it no longer matches the included filter + restartBroker(broker2, true); + assertNCDurableSubsCount(broker2, topic, 1); + restartBroker(broker1, true); + assertBridgeStarted(); + assertNCDurableSubsCount(broker2, topic, 0); + assertSubscriptionsCount(broker1, topic, 1); + + } + + @Test + public void testSubscriptionRemovedAfterStaticChanged() throws Exception { + forceDurable = true; + this.restartBrokers(true); + + final ActiveMQTopic topic = new ActiveMQTopic(this.staticIncludeTopics); + MessageConsumer sub1 = session1.createDurableSubscriber(topic, subName); + sub1.close(); + + assertSubscriptionsCount(broker1, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); + + doTearDown(); + + //change the included topics to make sure we still cleanup non-matching NC durables + staticIncludeTopics = "different.topic"; + this.restartBrokers(false); + assertSubscriptionsCount(broker1, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); + + //Send some messages to the NC sub and make sure it can still be deleted + MessageProducer prod = session2.createProducer(topic); + for (int i = 0; i < 10; i++) { + prod.send(session2.createTextMessage("test")); + } + + //Test that on successful reconnection of the bridge that + //the NC sub will be removed because even though the local subscription exists, + //it no longer matches the included static filter + restartBroker(broker2, true); + assertNCDurableSubsCount(broker2, topic, 1); + restartBroker(broker1, true); + assertBridgeStarted(); + assertNCDurableSubsCount(broker2, topic, 0); + assertSubscriptionsCount(broker1, topic, 1); + } + @Test public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Exception { final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); @@ -199,9 +311,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { //After sync, remove old NC and create one for topic 2 restartBroker(broker1, true); + assertBridgeStarted(); assertNCDurableSubsCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic2, 1); - } @Test @@ -225,6 +337,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { assertSubscriptionsCount(broker1, topic2, 1); restartBrokers(true); + assertBridgeStarted(); assertNCDurableSubsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic2, 1); assertNCDurableSubsCount(broker2, excludeTopic, 0); @@ -265,6 +378,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1); } + assertBridgeStarted(); } @@ -291,6 +405,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { //not be added restartBrokers(true); assertNCDurableSubsCount(broker2, topic, 0); + assertBridgeStarted(); } @@ -312,6 +427,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { restartBrokers(true); assertNCDurableSubsCount(broker2, topic, 0); + assertBridgeStarted(); } @Test @@ -335,6 +451,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { //bring online again session1.createDurableSubscriber(topic, subName); assertNCDurableSubsCount(broker2, topic, 1); + assertBridgeStarted(); } @@ -358,6 +475,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { restartBrokers(true); assertNCDurableSubsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, excludeTopic, 0); + assertBridgeStarted(); } @@ -389,6 +507,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { //between the sync command and the online durables that are added over //the consumer advisory restartBrokers(true); + assertBridgeStarted(); //Re-create session1.createDurableSubscriber(topic, subName); @@ -460,7 +579,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { public boolean isSatisified() throws Exception { return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1; } - }, 10000, 500); + }, 5000, 500); } localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -469,6 +588,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { session1 = localSession; } else { broker2 = localBroker; + session2 = localSession; } } @@ -486,6 +606,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { if (flow.equals(FLOW.FORWARD)) { broker2 = remoteBroker; + session2 = remoteSession; } else { broker1 = remoteBroker; session1 = remoteSession; @@ -524,8 +645,10 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { connector.setDuplex(true); connector.setStaticBridge(false); connector.setSyncDurableSubs(true); + connector.setStaticallyIncludedDestinations( + Lists.newArrayList(new ActiveMQTopic(staticIncludeTopics + "?forceDurable=" + forceDurable))); connector.setDynamicallyIncludedDestinations( - Lists.newArrayList(new ActiveMQTopic("include.test.>"))); + Lists.newArrayList(new ActiveMQTopic(includedTopics))); connector.setExcludedDestinations( Lists.newArrayList(new ActiveMQTopic(excludeTopicName))); return connector; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java index 0b388cc93d..4b8942ba7c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java @@ -88,6 +88,16 @@ public abstract class DynamicNetworkTestSupport { } } + + protected void assertBridgeStarted() throws Exception { + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1; + } + }, 10000, 500)); + } + protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context, final BrokerService brokerService) throws Exception { RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); @@ -181,6 +191,7 @@ public abstract class DynamicNetworkTestSupport { destination = (Topic) d; } + for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) { if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) { DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key); @@ -189,6 +200,7 @@ public abstract class DynamicNetworkTestSupport { } } } + return subs; }