AMQ-6858 - Allow configuration of the client Id token in network bridge

The client id token that is used to separate parts of a generated local
client id in a network bridge should be configurable so bridge names and
broker names can contain underscores if desired
This commit is contained in:
Christopher L. Shannon (cshannon) 2017-11-14 14:26:22 -05:00
parent 41211c78d1
commit 3ca439cada
3 changed files with 167 additions and 160 deletions
activemq-broker/src/main/java/org/apache/activemq/network
activemq-unit-tests/src/test/java/org/apache/activemq/network

View File

@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.management.ObjectName;
@ -492,7 +493,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
localConnectionInfo = new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
localClientId = configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + configuration.getBrokerName();
localConnectionInfo.setClientId(localClientId);
localConnectionInfo.setUserName(configuration.getUserName());
localConnectionInfo.setPassword(configuration.getPassword());
@ -520,8 +521,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
+ configuration.getBrokerName());
duplexLocalConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + remoteBrokerName + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + "duplex"
+ configuration.getClientIdToken() + configuration.getBrokerName());
duplexLocalConnectionInfo.setUserName(configuration.getUserName());
duplexLocalConnectionInfo.setPassword(configuration.getPassword());
@ -609,7 +610,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
remoteConnectionInfo = new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
remoteConnectionInfo.setClientId(configuration.getName() + configuration.getClientIdToken() + configuration.getBrokerName() + configuration.getClientIdToken() + "outbound");
remoteConnectionInfo.setUserName(configuration.getUserName());
remoteConnectionInfo.setPassword(configuration.getPassword());
remoteBroker.oneway(remoteConnectionInfo);
@ -685,7 +686,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (sub != null && path.length > 1 && subName != null) {
String b1 = path[path.length-1].toString();
String b2 = path[path.length-2].toString();
final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + "_inbound_" + b1, subName);
final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName);
sub.getDurableRemoteSubs().add(newSubInfo);
sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet();
LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
@ -695,15 +696,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
private String getProxyBridgeClientId(SubscriptionInfo info) {
String[] clientIdTokens = info.getClientId().split("_");
String newClientId = "";
if (clientIdTokens.length > 2) {
for (int j = clientIdTokens.length - 3; j < clientIdTokens.length; j++) {
newClientId += clientIdTokens[j];
if (j < clientIdTokens.length -1) {
newClientId += "_";
}
}
String newClientId = info.getClientId();
String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null;
if (clientIdTokens != null && clientIdTokens.length > 2) {
newClientId = clientIdTokens[clientIdTokens.length - 3] + configuration.getClientIdToken() + "inbound"
+ configuration.getClientIdToken() + clientIdTokens[clientIdTokens.length -1];
}
return newClientId;
}

View File

@ -59,6 +59,7 @@ public class NetworkBridgeConfiguration {
private String password;
private String destinationFilter = null;
private String name = "NC";
private String clientIdToken = "_";
protected List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
protected List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
@ -191,6 +192,14 @@ public class NetworkBridgeConfiguration {
this.brokerName = brokerName;
}
public String getClientIdToken() {
return clientIdToken;
}
public void setClientIdToken(String clientIdToken) {
this.clientIdToken = clientIdToken;
}
/**
* @return the networkTTL
*/

View File

@ -57,6 +57,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
connector.setConduitSubscriptions(true);
connector.setSyncDurableSubs(true);
connector.setNetworkTTL(-1);
connector.setClientIdToken("|");
return connector;
}
@ -71,15 +72,15 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
}
/**
* BrokerA -> BrokerB -> BrokerC
* Broker_A_A -> Broker_B_B -> Broker_C_C
*/
protected void testDurablePropagation() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerC");
bridgeBrokers("Broker_A_A", "Broker_B_B");
bridgeBrokers("Broker_B_B", "Broker_C_C");
if (!duplex) {
bridgeBrokers("BrokerB", "BrokerA");
bridgeBrokers("BrokerC", "BrokerB");
bridgeBrokers("Broker_B_B", "Broker_A_A");
bridgeBrokers("Broker_C_C", "Broker_B_B");
}
startAllBrokers();
@ -88,26 +89,26 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
Session ses = createSession("BrokerA");
Session ses = createSession("Broker_A_A");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
sendMessages("BrokerC", dest, 1);
sendMessages("Broker_C_C", dest, 1);
assertNotNull(clientA.receive(1000));
assertNotNull(clientB.receive(1000));
//bring online a consumer on the other side
Session ses2 = createSession("BrokerC");
Session ses2 = createSession("Broker_C_C");
MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
//there will be 2 network durables, 1 for each direction of the bridge
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
clientB.close();
@ -116,9 +117,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ses.unsubscribe("subB");
ses2.unsubscribe("subC");
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
}
@ -134,11 +135,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
protected void testDurablePropagationConsumerAllBrokers() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerC");
bridgeBrokers("Broker_A_A", "Broker_B_B");
bridgeBrokers("Broker_B_B", "Broker_C_C");
if (!duplex) {
bridgeBrokers("BrokerB", "BrokerA");
bridgeBrokers("BrokerC", "BrokerB");
bridgeBrokers("Broker_B_B", "Broker_A_A");
bridgeBrokers("Broker_C_C", "Broker_B_B");
}
startAllBrokers();
@ -147,28 +148,28 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
Session ses = createSession("BrokerA");
Session ses = createSession("Broker_A_A");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
//bring online a consumer on the other side
Session ses2 = createSession("BrokerB");
Session ses2 = createSession("Broker_B_B");
MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
Session ses3 = createSession("BrokerC");
Session ses3 = createSession("Broker_C_C");
MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
@ -179,9 +180,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ses3.unsubscribe("subC");
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
}
@ -197,15 +198,15 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
protected void testDurablePropagation5Broker() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerC");
bridgeBrokers("BrokerC", "BrokerD");
bridgeBrokers("BrokerD", "BrokerE");
bridgeBrokers("Broker_A_A", "Broker_B_B");
bridgeBrokers("Broker_B_B", "Broker_C_C");
bridgeBrokers("Broker_C_C", "Broker_D_D");
bridgeBrokers("Broker_D_D", "Broker_E_E");
if (!duplex) {
bridgeBrokers("BrokerB", "BrokerA");
bridgeBrokers("BrokerC", "BrokerB");
bridgeBrokers("BrokerD", "BrokerC");
bridgeBrokers("BrokerE", "BrokerD");
bridgeBrokers("Broker_B_B", "Broker_A_A");
bridgeBrokers("Broker_C_C", "Broker_B_B");
bridgeBrokers("Broker_D_D", "Broker_C_C");
bridgeBrokers("Broker_E_E", "Broker_D_D");
}
startAllBrokers();
@ -214,42 +215,42 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
Session ses = createSession("BrokerA");
Session ses = createSession("Broker_A_A");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
Thread.sleep(1000);
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
sendMessages("BrokerE", dest, 1);
sendMessages("Broker_E_E", dest, 1);
assertNotNull(clientA.receive(1000));
//bring online a consumer on the other side
Session ses2 = createSession("BrokerE");
Session ses2 = createSession("Broker_E_E");
MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
Thread.sleep(1000);
//there will be 2 network durables, 1 for each direction of the bridge
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
clientE.close();
ses.unsubscribe("subA");
ses2.unsubscribe("subE");
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
}
@ -265,13 +266,13 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
protected void testDurablePropagationSpoke() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerC");
bridgeBrokers("BrokerB", "BrokerD");
bridgeBrokers("Broker_A_A", "Broker_B_B");
bridgeBrokers("Broker_B_B", "Broker_C_C");
bridgeBrokers("Broker_B_B", "Broker_D_D");
if (!duplex) {
bridgeBrokers("BrokerB", "BrokerA");
bridgeBrokers("BrokerC", "BrokerB");
bridgeBrokers("BrokerD", "BrokerB");
bridgeBrokers("Broker_B_B", "Broker_A_A");
bridgeBrokers("Broker_C_C", "Broker_B_B");
bridgeBrokers("Broker_D_D", "Broker_B_B");
}
startAllBrokers();
@ -280,42 +281,42 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
Session ses = createSession("BrokerA");
Session ses2 = createSession("BrokerB");
Session ses3 = createSession("BrokerC");
Session ses4 = createSession("BrokerD");
Session ses = createSession("Broker_A_A");
Session ses2 = createSession("Broker_B_B");
Session ses3 = createSession("Broker_C_C");
Session ses4 = createSession("Broker_D_D");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
Thread.sleep(1000);
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
Thread.sleep(1000);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
sendMessages("BrokerA", dest, 1);
sendMessages("Broker_A_A", dest, 1);
assertNotNull(clientD.receive(1000));
sendMessages("BrokerC", dest, 1);
sendMessages("Broker_C_C", dest, 1);
assertNotNull(clientD.receive(1000));
MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
Thread.sleep(1000);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 3);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 3);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
clientAB.close();
@ -329,10 +330,10 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ses3.unsubscribe("subC");
ses4.unsubscribe("subD");
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
}
public void testForceDurablePropagationDuplex() throws Exception {
@ -347,11 +348,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
protected void testForceDurablePropagation() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerC");
bridgeBrokers("Broker_A_A", "Broker_B_B");
bridgeBrokers("Broker_B_B", "Broker_C_C");
if (!duplex) {
bridgeBrokers("BrokerB", "BrokerA");
bridgeBrokers("BrokerC", "BrokerB");
bridgeBrokers("Broker_B_B", "Broker_A_A");
bridgeBrokers("Broker_C_C", "Broker_B_B");
}
startAllBrokers();
@ -360,32 +361,32 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
Session ses = createSession("BrokerA");
Session ses = createSession("Broker_A_A");
MessageConsumer clientA = ses.createConsumer(dest);
Thread.sleep(1000);
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
sendMessages("BrokerC", dest, 1);
sendMessages("Broker_C_C", dest, 1);
assertNotNull(clientA.receive(1000));
Session ses2 = createSession("BrokerC");
Session ses2 = createSession("Broker_C_C");
MessageConsumer clientC = ses2.createConsumer(dest);
Thread.sleep(1000);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
clientC.close();
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
}
public void testDurablePropagationSyncDuplex() throws Exception {
@ -400,14 +401,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
protected void testDurablePropagationSync() throws Exception {
// Setup broker networks
NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
NetworkConnector nc1 = bridgeBrokers("Broker_A_A", "Broker_B_B");
NetworkConnector nc2 = bridgeBrokers("Broker_B_B", "Broker_C_C");
NetworkConnector nc3 = null;
NetworkConnector nc4 = null;
if (!duplex) {
nc3 = bridgeBrokers("BrokerB", "BrokerA");
nc4 = bridgeBrokers("BrokerC", "BrokerB");
nc3 = bridgeBrokers("Broker_B_B", "Broker_A_A");
nc4 = bridgeBrokers("Broker_C_C", "Broker_B_B");
}
startAllBrokers();
@ -424,16 +425,16 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
Session ses = createSession("BrokerA");
Session ses2 = createSession("BrokerC");
Session ses = createSession("Broker_A_A");
Session ses2 = createSession("Broker_C_C");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
Thread.sleep(1000);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
nc1.start();
nc2.start();
@ -443,9 +444,9 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
}
//there will be 2 network durables, 1 for each direction of the bridge
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
clientB.close();
@ -456,14 +457,14 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
duplex = true;
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerC");
bridgeBrokers("Broker_A_A", "Broker_B_B");
bridgeBrokers("Broker_B_B", "Broker_C_C");
//Duplicate the bridges with different included destinations - valid use case
NetworkConnector nc3 = bridgeBrokers("BrokerA", "BrokerB");
NetworkConnector nc4 = bridgeBrokers("BrokerB", "BrokerC");
nc3.setName("nc3");
nc4.setName("nc4");
NetworkConnector nc3 = bridgeBrokers("Broker_A_A", "Broker_B_B");
NetworkConnector nc4 = bridgeBrokers("Broker_B_B", "Broker_C_C");
nc3.setName("nc_3_3");
nc4.setName("nc_4_4");
nc3.setDynamicallyIncludedDestinations(
Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO2?forceDurable=true")));
nc4.setDynamicallyIncludedDestinations(
@ -476,8 +477,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
ActiveMQTopic dest2 = (ActiveMQTopic) createDestination("TEST.FOO2", true);
// Setup consumers
Session ses = createSession("BrokerA");
Session ses2 = createSession("BrokerC");
Session ses = createSession("Broker_A_A");
Session ses2 = createSession("Broker_C_C");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientAa = ses.createDurableSubscriber(dest2, "subAa");
MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
@ -485,33 +486,33 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
Thread.sleep(1000);
//make sure network durables are online
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 1);
clientA.close();
clientC.close();
ses.unsubscribe("subA");
ses2.unsubscribe("subC");
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 1);
clientAa.close();
clientCc.close();
ses.unsubscribe("subAa");
ses2.unsubscribe("subCc");
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 0);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 0);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest2, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest2, 0);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest2, 0);
}
protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
@ -552,11 +553,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
super.setAutoFail(true);
super.setUp();
String options = new String("?persistent=false&useJmx=false");
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + options));
createBroker(new URI("broker:(tcp://localhost:61620)/BrokerE" + options));
createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + options));
createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + options));
}
@Override