From 39d6321a4a2e9f929ba9cc451cafa8f244e3d716 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Tue, 14 Nov 2017 14:26:22 -0500 Subject: [PATCH] AMQ-6858 - Allow configuration of the client Id token in network bridge The client id token that is used to separate parts of a generated local client id in a network bridge should be configurable so bridge names and broker names can contain underscores if desired (cherry picked from commit 3ca439cadaa8c40f506aa1ef683b36624d216254) --- .../DemandForwardingBridgeSupport.java | 25 +- .../network/NetworkBridgeConfiguration.java | 9 + .../DurableFiveBrokerNetworkBridgeTest.java | 293 +++++++++--------- 3 files changed, 167 insertions(+), 160 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 03e79e4b56..75084d10db 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import javax.management.ObjectName; @@ -492,7 +493,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localConnectionInfo = new ConnectionInfo(); localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); - localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); + localClientId = configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + configuration.getBrokerName(); localConnectionInfo.setClientId(localClientId); localConnectionInfo.setUserName(configuration.getUserName()); localConnectionInfo.setPassword(configuration.getPassword()); @@ -520,8 +521,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); - duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" - + configuration.getBrokerName()); + duplexLocalConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + "duplex" + + configuration.getClientIdToken() + configuration.getBrokerName()); duplexLocalConnectionInfo.setUserName(configuration.getUserName()); duplexLocalConnectionInfo.setPassword(configuration.getPassword()); @@ -609,7 +610,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } remoteConnectionInfo = new ConnectionInfo(); remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); - remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound"); + remoteConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + configuration.getBrokerName() + configuration.getClientIdToken() + "outbound"); remoteConnectionInfo.setUserName(configuration.getUserName()); remoteConnectionInfo.setPassword(configuration.getPassword()); remoteBroker.oneway(remoteConnectionInfo); @@ -685,7 +686,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (sub != null && path.length > 1 && subName != null) { String b1 = path[path.length-1].toString(); String b2 = path[path.length-2].toString(); - final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + "_inbound_" + b1, subName); + final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName); sub.getDurableRemoteSubs().add(newSubInfo); sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet(); LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo); @@ -695,15 +696,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } private String getProxyBridgeClientId(SubscriptionInfo info) { - String[] clientIdTokens = info.getClientId().split("_"); - String newClientId = ""; - if (clientIdTokens.length > 2) { - for (int j = clientIdTokens.length - 3; j < clientIdTokens.length; j++) { - newClientId += clientIdTokens[j]; - if (j < clientIdTokens.length -1) { - newClientId += "_"; - } - } + String newClientId = info.getClientId(); + String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null; + if (clientIdTokens != null && clientIdTokens.length > 2) { + newClientId = clientIdTokens[clientIdTokens.length - 3] + configuration.getClientIdToken() + "inbound" + + configuration.getClientIdToken() + clientIdTokens[clientIdTokens.length -1]; } return newClientId; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index b2ca78ad8c..3c64758ddd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -59,6 +59,7 @@ public class NetworkBridgeConfiguration { private String password; private String destinationFilter = null; private String name = "NC"; + private String clientIdToken = "_"; protected List excludedDestinations = new CopyOnWriteArrayList(); protected List dynamicallyIncludedDestinations = new CopyOnWriteArrayList(); @@ -191,6 +192,14 @@ public class NetworkBridgeConfiguration { this.brokerName = brokerName; } + public String getClientIdToken() { + return clientIdToken; + } + + public void setClientIdToken(String clientIdToken) { + this.clientIdToken = clientIdToken; + } + /** * @return the networkTTL */ diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java index 4a63553480..94d73936f4 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java @@ -57,6 +57,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu connector.setConduitSubscriptions(true); connector.setSyncDurableSubs(true); connector.setNetworkTTL(-1); + connector.setClientIdToken("|"); return connector; } @@ -71,15 +72,15 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu } /** - * BrokerA -> BrokerB -> BrokerC + * Broker_A_A -> Broker_B_B -> Broker_C_C */ protected void testDurablePropagation() throws Exception { // Setup broker networks - bridgeBrokers("BrokerA", "BrokerB"); - bridgeBrokers("BrokerB", "BrokerC"); + bridgeBrokers("Broker_A_A", "Broker_B_B"); + bridgeBrokers("Broker_B_B", "Broker_C_C"); if (!duplex) { - bridgeBrokers("BrokerB", "BrokerA"); - bridgeBrokers("BrokerC", "BrokerB"); + bridgeBrokers("Broker_B_B", "Broker_A_A"); + bridgeBrokers("Broker_C_C", "Broker_B_B"); } startAllBrokers(); @@ -88,26 +89,26 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); // Setup consumers - Session ses = createSession("BrokerA"); + Session ses = createSession("Broker_A_A"); MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB"); // let consumers propagate around the network - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); - sendMessages("BrokerC", dest, 1); + sendMessages("Broker_C_C", dest, 1); assertNotNull(clientA.receive(1000)); assertNotNull(clientB.receive(1000)); //bring online a consumer on the other side - Session ses2 = createSession("BrokerC"); + Session ses2 = createSession("Broker_C_C"); MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC"); //there will be 2 network durables, 1 for each direction of the bridge - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1); clientA.close(); clientB.close(); @@ -116,9 +117,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu ses.unsubscribe("subB"); ses2.unsubscribe("subC"); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0); } @@ -134,11 +135,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu protected void testDurablePropagationConsumerAllBrokers() throws Exception { // Setup broker networks - bridgeBrokers("BrokerA", "BrokerB"); - bridgeBrokers("BrokerB", "BrokerC"); + bridgeBrokers("Broker_A_A", "Broker_B_B"); + bridgeBrokers("Broker_B_B", "Broker_C_C"); if (!duplex) { - bridgeBrokers("BrokerB", "BrokerA"); - bridgeBrokers("BrokerC", "BrokerB"); + bridgeBrokers("Broker_B_B", "Broker_A_A"); + bridgeBrokers("Broker_C_C", "Broker_B_B"); } startAllBrokers(); @@ -147,28 +148,28 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); // Setup consumers - Session ses = createSession("BrokerA"); + Session ses = createSession("Broker_A_A"); MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); // let consumers propagate around the network - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); //bring online a consumer on the other side - Session ses2 = createSession("BrokerB"); + Session ses2 = createSession("Broker_B_B"); MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB"); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1); - Session ses3 = createSession("BrokerC"); + Session ses3 = createSession("Broker_C_C"); MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC"); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1); clientA.close(); @@ -179,9 +180,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu ses3.unsubscribe("subC"); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0); } @@ -197,15 +198,15 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu protected void testDurablePropagation5Broker() throws Exception { // Setup broker networks - bridgeBrokers("BrokerA", "BrokerB"); - bridgeBrokers("BrokerB", "BrokerC"); - bridgeBrokers("BrokerC", "BrokerD"); - bridgeBrokers("BrokerD", "BrokerE"); + bridgeBrokers("Broker_A_A", "Broker_B_B"); + bridgeBrokers("Broker_B_B", "Broker_C_C"); + bridgeBrokers("Broker_C_C", "Broker_D_D"); + bridgeBrokers("Broker_D_D", "Broker_E_E"); if (!duplex) { - bridgeBrokers("BrokerB", "BrokerA"); - bridgeBrokers("BrokerC", "BrokerB"); - bridgeBrokers("BrokerD", "BrokerC"); - bridgeBrokers("BrokerE", "BrokerD"); + bridgeBrokers("Broker_B_B", "Broker_A_A"); + bridgeBrokers("Broker_C_C", "Broker_B_B"); + bridgeBrokers("Broker_D_D", "Broker_C_C"); + bridgeBrokers("Broker_E_E", "Broker_D_D"); } startAllBrokers(); @@ -214,42 +215,42 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); // Setup consumers - Session ses = createSession("BrokerA"); + Session ses = createSession("Broker_A_A"); MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); Thread.sleep(1000); // let consumers propagate around the network - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); - sendMessages("BrokerE", dest, 1); + sendMessages("Broker_E_E", dest, 1); assertNotNull(clientA.receive(1000)); //bring online a consumer on the other side - Session ses2 = createSession("BrokerE"); + Session ses2 = createSession("Broker_E_E"); MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE"); Thread.sleep(1000); //there will be 2 network durables, 1 for each direction of the bridge - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1); clientA.close(); clientE.close(); ses.unsubscribe("subA"); ses2.unsubscribe("subE"); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0); } @@ -265,13 +266,13 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu protected void testDurablePropagationSpoke() throws Exception { // Setup broker networks - bridgeBrokers("BrokerA", "BrokerB"); - bridgeBrokers("BrokerB", "BrokerC"); - bridgeBrokers("BrokerB", "BrokerD"); + bridgeBrokers("Broker_A_A", "Broker_B_B"); + bridgeBrokers("Broker_B_B", "Broker_C_C"); + bridgeBrokers("Broker_B_B", "Broker_D_D"); if (!duplex) { - bridgeBrokers("BrokerB", "BrokerA"); - bridgeBrokers("BrokerC", "BrokerB"); - bridgeBrokers("BrokerD", "BrokerB"); + bridgeBrokers("Broker_B_B", "Broker_A_A"); + bridgeBrokers("Broker_C_C", "Broker_B_B"); + bridgeBrokers("Broker_D_D", "Broker_B_B"); } startAllBrokers(); @@ -280,42 +281,42 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); // Setup consumers - Session ses = createSession("BrokerA"); - Session ses2 = createSession("BrokerB"); - Session ses3 = createSession("BrokerC"); - Session ses4 = createSession("BrokerD"); + Session ses = createSession("Broker_A_A"); + Session ses2 = createSession("Broker_B_B"); + Session ses3 = createSession("Broker_C_C"); + Session ses4 = createSession("Broker_D_D"); MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB"); Thread.sleep(1000); // let consumers propagate around the network - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD"); Thread.sleep(1000); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1); - sendMessages("BrokerA", dest, 1); + sendMessages("Broker_A_A", dest, 1); assertNotNull(clientD.receive(1000)); - sendMessages("BrokerC", dest, 1); + sendMessages("Broker_C_C", dest, 1); assertNotNull(clientD.receive(1000)); MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB"); MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC"); Thread.sleep(1000); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 3); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 3); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1); clientA.close(); clientAB.close(); @@ -329,10 +330,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu ses3.unsubscribe("subC"); ses4.unsubscribe("subD"); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0); } public void testForceDurablePropagationDuplex() throws Exception { @@ -347,11 +348,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu protected void testForceDurablePropagation() throws Exception { // Setup broker networks - bridgeBrokers("BrokerA", "BrokerB"); - bridgeBrokers("BrokerB", "BrokerC"); + bridgeBrokers("Broker_A_A", "Broker_B_B"); + bridgeBrokers("Broker_B_B", "Broker_C_C"); if (!duplex) { - bridgeBrokers("BrokerB", "BrokerA"); - bridgeBrokers("BrokerC", "BrokerB"); + bridgeBrokers("Broker_B_B", "Broker_A_A"); + bridgeBrokers("Broker_C_C", "Broker_B_B"); } startAllBrokers(); @@ -360,32 +361,32 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); // Setup consumers - Session ses = createSession("BrokerA"); + Session ses = createSession("Broker_A_A"); MessageConsumer clientA = ses.createConsumer(dest); Thread.sleep(1000); // let consumers propagate around the network - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); - sendMessages("BrokerC", dest, 1); + sendMessages("Broker_C_C", dest, 1); assertNotNull(clientA.receive(1000)); - Session ses2 = createSession("BrokerC"); + Session ses2 = createSession("Broker_C_C"); MessageConsumer clientC = ses2.createConsumer(dest); Thread.sleep(1000); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1); clientA.close(); clientC.close(); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0); } public void testDurablePropagationSyncDuplex() throws Exception { @@ -400,14 +401,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu protected void testDurablePropagationSync() throws Exception { // Setup broker networks - NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB"); - NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC"); + NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B"); + NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C"); NetworkConnector nc3 = null; NetworkConnector nc4 = null; if (!duplex) { - nc3 = bridgeBrokers("BrokerB", "BrokerA"); - nc4 = bridgeBrokers("BrokerC", "BrokerB"); + nc3 = bridgeBrokers("Broker_B_B", "Broker_A_A"); + nc4 = bridgeBrokers("Broker_C_C", "Broker_B_B"); } startAllBrokers(); @@ -424,16 +425,16 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); // Setup consumers - Session ses = createSession("BrokerA"); - Session ses2 = createSession("BrokerC"); + Session ses = createSession("Broker_A_A"); + Session ses2 = createSession("Broker_C_C"); MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB"); MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC"); Thread.sleep(1000); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0); nc1.start(); nc2.start(); @@ -443,9 +444,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu } //there will be 2 network durables, 1 for each direction of the bridge - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1); clientA.close(); clientB.close(); @@ -456,14 +457,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu duplex = true; // Setup broker networks - bridgeBrokers("BrokerA", "BrokerB"); - bridgeBrokers("BrokerB", "BrokerC"); + bridgeBrokers("Broker_A_A", "Broker_B_B"); + bridgeBrokers("Broker_B_B", "Broker_C_C"); //Duplicate the bridges with different included destinations - valid use case - NetworkConnector nc3 = bridgeBrokers("BrokerA", "BrokerB"); - NetworkConnector nc4 = bridgeBrokers("BrokerB", "BrokerC"); - nc3.setName("nc3"); - nc4.setName("nc4"); + NetworkConnector nc3 = bridgeBrokers("Broker_A_A", "Broker_B_B"); + NetworkConnector nc4 = bridgeBrokers("Broker_B_B", "Broker_C_C"); + nc3.setName("nc_3_3"); + nc4.setName("nc_4_4"); nc3.setDynamicallyIncludedDestinations( Lists. newArrayList(new ActiveMQTopic("TEST.FOO2?forceDurable=true"))); nc4.setDynamicallyIncludedDestinations( @@ -476,8 +477,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu ActiveMQTopic dest2 = (ActiveMQTopic) createDestination("TEST.FOO2", true); // Setup consumers - Session ses = createSession("BrokerA"); - Session ses2 = createSession("BrokerC"); + Session ses = createSession("Broker_A_A"); + Session ses2 = createSession("Broker_C_C"); MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); MessageConsumer clientAa = ses.createDurableSubscriber(dest2, "subAa"); MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC"); @@ -485,33 +486,33 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu Thread.sleep(1000); //make sure network durables are online - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 2); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 1); clientA.close(); clientC.close(); ses.unsubscribe("subA"); ses2.unsubscribe("subC"); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 2); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 1); clientAa.close(); clientCc.close(); ses.unsubscribe("subAa"); ses2.unsubscribe("subCc"); - assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 0); - assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 0); - assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 0); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 0); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 0); } protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest, @@ -552,11 +553,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu super.setAutoFail(true); super.setUp(); String options = new String("?persistent=false&useJmx=false"); - createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options)); - createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options)); - createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options)); - createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + options)); - createBroker(new URI("broker:(tcp://localhost:61620)/BrokerE" + options)); + createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options)); + createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options)); + createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options)); + createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + options)); + createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + options)); } @Override