Bosanac Dejan 2009-02-18 16:24:56 +00:00
parent 9e3e5ad4cc
commit 8b0bb3373a
8 changed files with 70 additions and 13 deletions

View File

@ -47,12 +47,12 @@ public class ConduitBridge extends DemandForwardingBridge {
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
if (addToAlreadyInterestedConsumers(info)) {
return null; // don't want this subscription added
}
//add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
info.setSelector(null);
return doCreateDemandSubscription(info);
}

View File

@ -95,6 +95,7 @@ public class DurableConduitBridge extends ConduitBridge {
info.setSubscriptionName(getSubscriberName(info.getDestination()));
}
info.setSelector(null);
return doCreateDemandSubscription(info);
}

View File

@ -75,27 +75,27 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
protected boolean verbose;
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
return bridgeBrokers(localBrokerName, remoteBrokerName, false, 1);
return bridgeBrokers(localBrokerName, remoteBrokerName, false, 1, true);
}
protected void bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly) throws Exception {
BrokerService localBroker = brokers.get(localBrokerName).broker;
BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1);
bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1, true);
}
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception {
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
BrokerService localBroker = brokers.get(localBrokerName).broker;
BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, networkTTL);
return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, networkTTL, conduit);
}
// Overwrite this method to specify how you want to bridge the two brokers
// By default, bridge them using add network connector of the local broker
// and the first connector of the remote broker
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL) throws Exception {
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
List transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {
@ -103,6 +103,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
NetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:" + remoteURI));
connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL);
connector.setConduitSubscriptions(conduit);
localBroker.addNetworkConnector(connector);
maxSetupTime = 2000;
return connector;

View File

@ -136,7 +136,7 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
@Override
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL) throws Exception {
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {

View File

@ -94,8 +94,8 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
*/
public void testBAandBCbrokerNetworkWithSelectorsSendFirst() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerB", "BrokerA");
bridgeBrokers("BrokerB", "BrokerC");
bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
startAllBrokers();
@ -135,8 +135,8 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
*/
public void testBAandBCbrokerNetworkWithSelectorsSubscribeFirst() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerB", "BrokerA");
bridgeBrokers("BrokerB", "BrokerC");
bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
startAllBrokers();

View File

@ -109,7 +109,7 @@ public class ThreeBrokerTempQueueNetworkTest extends JmsMultipleBrokersTestSuppo
}
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception {
NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName, dynamicOnly, networkTTL);
NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName, dynamicOnly, networkTTL, true);
connector.setBridgeTempDestinations(enableTempDestinationBridging);
return connector;
}

View File

@ -17,11 +17,15 @@
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.HashMap;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.transport.failover.FailoverUriTest;
import org.apache.activemq.util.MessageIdList;
/**
@ -29,6 +33,7 @@ import org.apache.activemq.util.MessageIdList;
*/
public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
protected static final int MESSAGE_COUNT = 100;
public boolean dynamicOnly;
/**
* BrokerA -> BrokerB -> BrokerC
@ -69,6 +74,52 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
}
public void initCombosForTestABandBCbrokerNetworkWithSelectors() {
addCombinationValues("dynamicOnly", new Object[] {true, false});
}
/**
* BrokerA -> BrokerB -> BrokerC
*/
public void testABandBCbrokerNetworkWithSelectors() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, 2, true);
bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, 2, true);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", true);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerC", dest, "dummy = 33");
MessageConsumer clientB = createConsumer("BrokerC", dest, "dummy > 30");
MessageConsumer clientC = createConsumer("BrokerC", dest, "dummy = 34");
// let consumers propogate around the network
Thread.sleep(2000);
// Send messages
// Send messages for broker A
HashMap<String, Object> props = new HashMap<String, Object>();
props.put("dummy", 33);
sendMessages("BrokerA", dest, MESSAGE_COUNT, props);
props.put("dummy", 34);
sendMessages("BrokerA", dest, MESSAGE_COUNT * 2, props);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerC", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerC", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2) ;
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT *2, msgsC.getMessageCount());
}
/**
* BrokerA <- BrokerB -> BrokerC
*/
@ -237,4 +288,8 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
}
public static Test suite() {
return suite(ThreeBrokerTopicNetworkTest.class);
}
}