diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java index 6ced8961a1..bc9d004132 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.SubscriptionInfo; @@ -80,7 +81,12 @@ public class ConduitBridge extends DemandForwardingBridge { ds.addForcedDurableConsumer(info.getConsumerId()); } } else { - ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); + if (isProxyNSConsumer(info)) { + final BrokerId[] path = info.getBrokerPath(); + addProxyNetworkSubscription(ds, path, info.getSubscriptionName()); + } else { + ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); + } } matched = true; // continue - we want interest to any existing DemandSubscriptions diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index efdfa5a7dc..03e79e4b56 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -36,6 +36,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.management.ObjectName; @@ -94,7 +95,6 @@ import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.failover.FailoverTransport; -import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IntrospectionSupport; @@ -666,11 +666,52 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br * @param info * @return */ - protected boolean isBridgeNS(ConsumerInfo info) { + protected boolean isDirectBridgeConsumer(ConsumerInfo info) { return (info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) && (info.getClientId() == null || info.getClientId().startsWith(configuration.getName())); } + private boolean isProxyBridgeSubscription(SubscriptionInfo info) { + if (info.getSubcriptionName() != null && info.getClientId() != null) { + if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) + && !info.getClientId().startsWith(configuration.getName())) { + return true; + } + } + return false; + } + + protected void addProxyNetworkSubscription(final DemandSubscription sub, final BrokerId[] path, String subName) { + if (sub != null && path.length > 1 && subName != null) { + String b1 = path[path.length-1].toString(); + String b2 = path[path.length-2].toString(); + final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + "_inbound_" + b1, subName); + sub.getDurableRemoteSubs().add(newSubInfo); + sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet(); + LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo); + } else { + LOG.debug("Skipping addProxyNetworkSubscription"); + } + } + + private String getProxyBridgeClientId(SubscriptionInfo info) { + String[] clientIdTokens = info.getClientId().split("_"); + String newClientId = ""; + if (clientIdTokens.length > 2) { + for (int j = clientIdTokens.length - 3; j < clientIdTokens.length; j++) { + newClientId += clientIdTokens[j]; + if (j < clientIdTokens.length -1) { + newClientId += "_"; + } + } + } + return newClientId; + } + + protected boolean isProxyNSConsumer(ConsumerInfo info) { + return info.getBrokerPath() != null && info.getBrokerPath().length > 1; + } + protected void serviceRemoteCommand(Command command) { if (!disposed.get()) { try { @@ -706,7 +747,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br //dynamicallyIncludedDestinations list //Also re-add network consumers that are not part of this direct //bridge (proxy of proxy bridges) - if((info.getSubscriptionName() == null || !isBridgeNS(info)) && + if((info.getSubscriptionName() == null || !isDirectBridgeConsumer(info)) && NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) { serviceRemoteConsumerAdvisory(info); } @@ -975,8 +1016,22 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { DemandSubscription ds = i.next(); boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo); + if (removed) { cleanupDurableSub(ds, i); + //If this is a proxy bridge subscription we need to try changing the clientId + } else if (!removed && isProxyBridgeSubscription(subscriptionInfo)){ + subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo)); + if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) { + AtomicInteger count = ds.getNetworkDemandConsumerMap().computeIfAbsent(subscriptionInfo, v -> new AtomicInteger()); + count.decrementAndGet(); + //Only remove the durable remote sub if the count <= 0 + if (count.get() <= 0) { + ds.getDurableRemoteSubs().remove(subscriptionInfo); + ds.getNetworkDemandConsumerMap().remove(subscriptionInfo); + cleanupDurableSub(ds, i); + } + } } } } @@ -984,6 +1039,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br private void cleanupDurableSub(final DemandSubscription ds, Iterator i) throws IOException { + if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty() && ds.getForcedDurableConsumersSize() == 0) { // deactivate subscriber @@ -998,9 +1054,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localBroker.oneway(sending); //remove subscriber from map - if (i != null) { - i.remove(); - } + i.remove(); } } @@ -1094,18 +1148,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br public void run() { sub.waitForCompletion(); try { - //If removing a network durable subscription that still has durable remote subs - //make sure we cleanup the durable subscription properly - necessary when using - //durable subscriptions and 3 or more brokers - if (configuration.isConduitSubscriptions() && - sub.getLocalInfo().getSubscriptionName() != null && - sub.getLocalInfo().getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) && - sub.getDurableRemoteSubs().size() > 0) { - sub.getDurableRemoteSubs().clear(); - cleanupDurableSub(sub, null); - } else { - localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); - } + localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); } catch (IOException e) { LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e); } @@ -1367,7 +1410,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br undoMapRegistration(sub); } else { if (consumerInfo.isDurable()) { - sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName())); + if (isProxyNSConsumer(sub.getRemoteInfo())) { + BrokerId[] path = sub.getRemoteInfo().getBrokerPath(); + addProxyNetworkSubscription(sub, path, consumerInfo.getSubscriptionName()); + } else { + sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName())); + } } addSubscription(sub); LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub); diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java index 371df0ae49..96a9baf963 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.network; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,6 +44,8 @@ public class DemandSubscription { private final AtomicBoolean activeWaiter = new AtomicBoolean(); private final Set durableRemoteSubs = new CopyOnWriteArraySet(); private final Set forcedDurableConsumers = new CopyOnWriteArraySet(); + //Used for proxy network consumers + private final Map networkDemandConsumerMap = new ConcurrentHashMap<>(); private SubscriptionInfo localDurableSubscriber; private NetworkBridgeFilter networkBridgeFilter; @@ -83,6 +87,10 @@ public class DemandSubscription { return durableRemoteSubs; } + public Map getNetworkDemandConsumerMap() { + return networkDemandConsumerMap; + } + /** * @return true if there are no interested consumers */ diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java index 42f30a49c4..8d14f7492f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java @@ -142,12 +142,8 @@ public class DurableConduitBridge extends ConduitBridge { info.setSubscriptionName(getSubscriberName(info.getDestination())); // and override the consumerId with something unique so that it won't // be removed if the durable subscriber (at the other end) goes away - //Only do this for direct bridge consumers - proxy network consumers we don't - //want to replace the consumerId or cleanup won't happen properly - if (info.getBrokerPath().length == 1 || (info.getBrokerPath().length > 1 && info.getBrokerPath()[0] == remoteBrokerPath[0])) { - info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), + info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); - } } info.setSelector(null); DemandSubscription demandSubscription = doCreateDemandSubscription(info); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java new file mode 100644 index 0000000000..4a63553480 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java @@ -0,0 +1,576 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFilter; +import org.apache.activemq.broker.region.DurableTopicSubscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.SubscriptionKey; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; + +import com.google.common.collect.Lists; + +import junit.framework.Test; + +/** + * Test to make sure durable subscriptions propagate properly throughout network bridges + * and that conduit subscriptions work properly + */ +public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport { + + private boolean duplex = true; + + @Override + protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception { + NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName); + connector.setDynamicallyIncludedDestinations( + Lists. newArrayList(new ActiveMQTopic("TEST.FOO?forceDurable=true"))); + connector.setDuplex(duplex); + connector.setDecreaseNetworkConsumerPriority(false); + connector.setConduitSubscriptions(true); + connector.setSyncDurableSubs(true); + connector.setNetworkTTL(-1); + return connector; + } + + public void testDurablePropagationDuplex() throws Exception { + duplex = true; + testDurablePropagation(); + } + + public void testDurablePropagationOneWay() throws Exception { + duplex = false; + testDurablePropagation(); + } + + /** + * BrokerA -> BrokerB -> BrokerC + */ + protected void testDurablePropagation() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerC"); + if (!duplex) { + bridgeBrokers("BrokerB", "BrokerA"); + bridgeBrokers("BrokerC", "BrokerB"); + } + + startAllBrokers(); + + // Setup destination + ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); + + // Setup consumers + Session ses = createSession("BrokerA"); + MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); + MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB"); + + // let consumers propagate around the network + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + + sendMessages("BrokerC", dest, 1); + assertNotNull(clientA.receive(1000)); + assertNotNull(clientB.receive(1000)); + + //bring online a consumer on the other side + Session ses2 = createSession("BrokerC"); + MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC"); + //there will be 2 network durables, 1 for each direction of the bridge + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + + clientA.close(); + clientB.close(); + clientC.close(); + ses.unsubscribe("subA"); + ses.unsubscribe("subB"); + ses2.unsubscribe("subC"); + + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + + } + + public void testDurablePropagationConsumerAllBrokersDuplex() throws Exception { + duplex = true; + testDurablePropagationConsumerAllBrokers(); + } + + public void testDurablePropagationConsumerAllBrokersOneWay() throws Exception { + duplex = false; + testDurablePropagationConsumerAllBrokers(); + } + + protected void testDurablePropagationConsumerAllBrokers() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerC"); + if (!duplex) { + bridgeBrokers("BrokerB", "BrokerA"); + bridgeBrokers("BrokerC", "BrokerB"); + } + + startAllBrokers(); + + // Setup destination + ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); + + // Setup consumers + Session ses = createSession("BrokerA"); + MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); + + // let consumers propagate around the network + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + + //bring online a consumer on the other side + Session ses2 = createSession("BrokerB"); + MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB"); + + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + + Session ses3 = createSession("BrokerC"); + MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC"); + + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + + + clientA.close(); + clientB.close(); + clientC.close(); + ses.unsubscribe("subA"); + ses2.unsubscribe("subB"); + ses3.unsubscribe("subC"); + + + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + + } + + public void testDurablePropagation5BrokerDuplex() throws Exception { + duplex = true; + testDurablePropagation5Broker(); + } + + public void testDurablePropagation5BrokerOneWay() throws Exception { + duplex = false; + testDurablePropagation5Broker(); + } + + protected void testDurablePropagation5Broker() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerC"); + bridgeBrokers("BrokerC", "BrokerD"); + bridgeBrokers("BrokerD", "BrokerE"); + if (!duplex) { + bridgeBrokers("BrokerB", "BrokerA"); + bridgeBrokers("BrokerC", "BrokerB"); + bridgeBrokers("BrokerD", "BrokerC"); + bridgeBrokers("BrokerE", "BrokerD"); + } + + startAllBrokers(); + + // Setup destination + ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); + + // Setup consumers + Session ses = createSession("BrokerA"); + MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); + Thread.sleep(1000); + + // let consumers propagate around the network + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + + sendMessages("BrokerE", dest, 1); + assertNotNull(clientA.receive(1000)); + + //bring online a consumer on the other side + Session ses2 = createSession("BrokerE"); + MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE"); + Thread.sleep(1000); + + //there will be 2 network durables, 1 for each direction of the bridge + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + + clientA.close(); + clientE.close(); + ses.unsubscribe("subA"); + ses2.unsubscribe("subE"); + + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 0); + + } + + public void testDurablePropagationSpokeDuplex() throws Exception { + duplex = true; + testDurablePropagationSpoke(); + } + + public void testDurablePropagationSpokeOneWay() throws Exception { + duplex = false; + testDurablePropagationSpoke(); + } + + protected void testDurablePropagationSpoke() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerC"); + bridgeBrokers("BrokerB", "BrokerD"); + if (!duplex) { + bridgeBrokers("BrokerB", "BrokerA"); + bridgeBrokers("BrokerC", "BrokerB"); + bridgeBrokers("BrokerD", "BrokerB"); + } + + startAllBrokers(); + + // Setup destination + ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); + + // Setup consumers + Session ses = createSession("BrokerA"); + Session ses2 = createSession("BrokerB"); + Session ses3 = createSession("BrokerC"); + Session ses4 = createSession("BrokerD"); + + MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); + MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB"); + Thread.sleep(1000); + + // let consumers propagate around the network + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + + MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD"); + Thread.sleep(1000); + + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + + sendMessages("BrokerA", dest, 1); + assertNotNull(clientD.receive(1000)); + sendMessages("BrokerC", dest, 1); + assertNotNull(clientD.receive(1000)); + + MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB"); + MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC"); + Thread.sleep(1000); + + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 3); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + + clientA.close(); + clientAB.close(); + clientB.close(); + clientC.close(); + clientD.close(); + + ses.unsubscribe("subA"); + ses.unsubscribe("subAB"); + ses2.unsubscribe("subB"); + ses3.unsubscribe("subC"); + ses4.unsubscribe("subD"); + + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0); + } + + public void testForceDurablePropagationDuplex() throws Exception { + duplex = true; + testForceDurablePropagation(); + } + + public void testForceDurablePropagationOneWay() throws Exception { + duplex = false; + testForceDurablePropagation(); + } + + protected void testForceDurablePropagation() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerC"); + if (!duplex) { + bridgeBrokers("BrokerB", "BrokerA"); + bridgeBrokers("BrokerC", "BrokerB"); + } + + startAllBrokers(); + + // Setup destination + ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); + + // Setup consumers + Session ses = createSession("BrokerA"); + MessageConsumer clientA = ses.createConsumer(dest); + Thread.sleep(1000); + + // let consumers propagate around the network + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + + sendMessages("BrokerC", dest, 1); + assertNotNull(clientA.receive(1000)); + + Session ses2 = createSession("BrokerC"); + MessageConsumer clientC = ses2.createConsumer(dest); + Thread.sleep(1000); + + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + + clientA.close(); + clientC.close(); + + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + } + + public void testDurablePropagationSyncDuplex() throws Exception { + duplex = true; + testDurablePropagationSync(); + } + + public void testDurablePropagationSyncOneWay() throws Exception { + duplex = false; + testDurablePropagationSync(); + } + + protected void testDurablePropagationSync() throws Exception { + // Setup broker networks + NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB"); + NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC"); + + NetworkConnector nc3 = null; + NetworkConnector nc4 = null; + if (!duplex) { + nc3 = bridgeBrokers("BrokerB", "BrokerA"); + nc4 = bridgeBrokers("BrokerC", "BrokerB"); + } + + startAllBrokers(); + + nc1.stop(); + nc2.stop(); + + if (!duplex) { + nc3.stop(); + nc4.stop(); + } + + // Setup destination + ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); + + // Setup consumers + Session ses = createSession("BrokerA"); + Session ses2 = createSession("BrokerC"); + MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); + MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB"); + MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC"); + Thread.sleep(1000); + + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + + nc1.start(); + nc2.start(); + if (!duplex) { + nc3.start(); + nc4.start(); + } + + //there will be 2 network durables, 1 for each direction of the bridge + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + + clientA.close(); + clientB.close(); + clientC.close(); + } + + public void testDurablePropagationMultipleBridgesDifferentDestinations() throws Exception { + duplex = true; + + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerC"); + + //Duplicate the bridges with different included destinations - valid use case + NetworkConnector nc3 = bridgeBrokers("BrokerA", "BrokerB"); + NetworkConnector nc4 = bridgeBrokers("BrokerB", "BrokerC"); + nc3.setName("nc3"); + nc4.setName("nc4"); + nc3.setDynamicallyIncludedDestinations( + Lists. newArrayList(new ActiveMQTopic("TEST.FOO2?forceDurable=true"))); + nc4.setDynamicallyIncludedDestinations( + Lists. newArrayList(new ActiveMQTopic("TEST.FOO2?forceDurable=true"))); + + startAllBrokers(); + + // Setup destination + ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); + ActiveMQTopic dest2 = (ActiveMQTopic) createDestination("TEST.FOO2", true); + + // Setup consumers + Session ses = createSession("BrokerA"); + Session ses2 = createSession("BrokerC"); + MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); + MessageConsumer clientAa = ses.createDurableSubscriber(dest2, "subAa"); + MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC"); + MessageConsumer clientCc = ses2.createDurableSubscriber(dest2, "subCc"); + Thread.sleep(1000); + + //make sure network durables are online + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1); + + clientA.close(); + clientC.close(); + ses.unsubscribe("subA"); + ses2.unsubscribe("subC"); + + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1); + + clientAa.close(); + clientCc.close(); + ses.unsubscribe("subAa"); + ses2.unsubscribe("subCc"); + + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 0); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 0); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 0); + } + + protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest, + final int count) throws Exception { + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return count == getNCDurableSubs(brokerService, dest).size(); + } + }, 10000, 500)); + } + + protected List getNCDurableSubs(final BrokerService brokerService, + final ActiveMQTopic dest) throws Exception { + List subs = new ArrayList<>(); + Destination d = brokerService.getDestination(dest); + org.apache.activemq.broker.region.Topic destination = null; + if (d instanceof DestinationFilter) { + destination = ((DestinationFilter) d).getAdaptor(org.apache.activemq.broker.region.Topic.class); + } else { + destination = (org.apache.activemq.broker.region.Topic) d; + } + + for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) { + if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) { + DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key); + if (sub != null) { + subs.add(sub); + } + } + } + + return subs; + } + + @Override + public void setUp() throws Exception { + super.setAutoFail(true); + super.setUp(); + String options = new String("?persistent=false&useJmx=false"); + createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options)); + createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options)); + createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options)); + createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + options)); + createBroker(new URI("broker:(tcp://localhost:61620)/BrokerE" + options)); + } + + @Override + protected void configureBroker(BrokerService broker) { + broker.setBrokerId(broker.getBrokerName()); + } + + protected Session createSession(String broker) throws Exception { + Connection con = createConnection(broker); + con.start(); + return con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public static Test suite() { + return suite(DurableFiveBrokerNetworkBridgeTest.class); + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java deleted file mode 100644 index ff09a1c335..0000000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java +++ /dev/null @@ -1,241 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.network; - -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.activemq.JmsMultipleBrokersTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.DestinationFilter; -import org.apache.activemq.broker.region.DurableTopicSubscription; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.util.MessageIdList; -import org.apache.activemq.util.SubscriptionKey; -import org.apache.activemq.util.Wait; -import org.apache.activemq.util.Wait.Condition; - -import com.google.common.collect.Lists; - -import junit.framework.Test; - -/** - * Test to make sure durable subscriptions propagate properly throughout network bridges - * and that conduit subscriptions work properly - */ -public class DurableThreeBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport { - - @Override - protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception { - NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName); - connector.setDynamicallyIncludedDestinations( - Lists. newArrayList(new ActiveMQTopic("TEST.FOO?forceDurable=true"))); - connector.setDuplex(true); - connector.setDecreaseNetworkConsumerPriority(false); - connector.setConduitSubscriptions(true); - connector.setSyncDurableSubs(true); - connector.setNetworkTTL(-1); - return connector; - } - - /** - * BrokerA -> BrokerB -> BrokerC - */ - public void testDurablePropagation() throws Exception { - // Setup broker networks - bridgeBrokers("BrokerA", "BrokerB"); - bridgeBrokers("BrokerB", "BrokerC"); - - startAllBrokers(); - - // Setup destination - ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); - - // Setup consumers - Session ses = createSession("BrokerA"); - MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); - MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB"); - - // let consumers propagate around the network - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - - sendMessages("BrokerC", dest, 1); - assertNotNull(clientA.receive(1000)); - assertNotNull(clientB.receive(1000)); - - //bring online a consumer on the other side - Session ses2 = createSession("BrokerC"); - MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC"); - //there will be 2 network durables, 1 for each direction of the bridge - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); - - clientA.close(); - clientB.close(); - clientC.close(); - ses.unsubscribe("subA"); - ses.unsubscribe("subB"); - ses2.unsubscribe("subC"); - - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); - - } - - public void testForceDurablePropagation() throws Exception { - // Setup broker networks - bridgeBrokers("BrokerA", "BrokerB"); - bridgeBrokers("BrokerB", "BrokerC"); - - startAllBrokers(); - - // Setup destination - ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); - - // Setup consumers - Session ses = createSession("BrokerA"); - MessageConsumer clientA = ses.createConsumer(dest); - - // let consumers propagate around the network - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - - sendMessages("BrokerC", dest, 1); - assertNotNull(clientA.receive(1000)); - - Session ses2 = createSession("BrokerC"); - MessageConsumer clientC = ses2.createConsumer(dest); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); - - clientA.close(); - clientC.close(); - - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); - } - - public void testDurablePropagationSync() throws Exception { - // Setup broker networks - NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB"); - NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC"); - - startAllBrokers(); - - nc1.stop(); - nc2.stop(); - - // Setup destination - ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); - - // Setup consumers - Session ses = createSession("BrokerA"); - Session ses2 = createSession("BrokerC"); - MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); - MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB"); - MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC"); - - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); - - nc1.start(); - nc2.start(); - - //there will be 2 network durables, 1 for each direction of the bridge - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); - - clientA.close(); - clientB.close(); - clientC.close(); - } - - - protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest, - final int count) throws Exception { - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return count == getNCDurableSubs(brokerService, dest).size(); - } - }, 10000, 500)); - } - - protected List getNCDurableSubs(final BrokerService brokerService, - final ActiveMQTopic dest) throws Exception { - List subs = new ArrayList<>(); - Destination d = brokerService.getDestination(dest); - org.apache.activemq.broker.region.Topic destination = null; - if (d instanceof DestinationFilter) { - destination = ((DestinationFilter) d).getAdaptor(org.apache.activemq.broker.region.Topic.class); - } else { - destination = (org.apache.activemq.broker.region.Topic) d; - } - - for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) { - if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) { - DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key); - if (sub != null) { - subs.add(sub); - } - } - } - - return subs; - } - - @Override - public void setUp() throws Exception { - super.setAutoFail(true); - super.setUp(); - String options = new String("?persistent=false&useJmx=false"); - createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options)); - createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options)); - createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options)); - } - - @Override - protected void configureBroker(BrokerService broker) { - broker.setBrokerId(broker.getBrokerName()); - } - - protected Session createSession(String broker) throws Exception { - Connection con = createConnection(broker); - con.start(); - return con.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - public static Test suite() { - return suite(DurableThreeBrokerNetworkBridgeTest.class); - } -}