mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@433673 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
07c1e65c22
commit
6a12e9cdeb
|
@ -85,6 +85,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
|||
protected int prefetchSize = 1000;
|
||||
protected boolean dispatchAsync;
|
||||
protected String destinationFilter = ">";
|
||||
protected boolean bridgeTempDestinations = false;
|
||||
protected String name = "bridge";
|
||||
protected ConsumerInfo demandConsumerInfo;
|
||||
protected int demandConsumerDispatched;
|
||||
|
@ -240,7 +241,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
|||
if(remoteBridgeStarted.compareAndSet(false,true)) {
|
||||
|
||||
synchronized (this) {
|
||||
|
||||
|
||||
if( remoteConnectionInfo!=null ) {
|
||||
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
|
||||
}
|
||||
|
@ -270,12 +271,16 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
|||
+destinationFilter));
|
||||
demandConsumerInfo.setPrefetchSize(prefetchSize);
|
||||
remoteBroker.oneway(demandConsumerInfo);
|
||||
//we want information about Destinations as well
|
||||
ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
|
||||
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
|
||||
destinationInfo.setPrefetchSize(prefetchSize);
|
||||
destinationInfo.setDispatchAsync(dispatchAsync);
|
||||
remoteBroker.oneway(destinationInfo);
|
||||
|
||||
if( bridgeTempDestinations ) {
|
||||
//we want information about Destinations as well
|
||||
ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
|
||||
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
|
||||
destinationInfo.setPrefetchSize(prefetchSize);
|
||||
destinationInfo.setDispatchAsync(dispatchAsync);
|
||||
remoteBroker.oneway(destinationInfo);
|
||||
}
|
||||
|
||||
startedLatch.countDown();
|
||||
|
||||
if (!disposed){
|
||||
|
@ -729,6 +734,11 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
|||
}
|
||||
|
||||
protected boolean isPermissableDestination(ActiveMQDestination destination) {
|
||||
|
||||
// Are we not bridging temp destinations?
|
||||
if( destination.isTemporary() && !bridgeTempDestinations )
|
||||
return false;
|
||||
|
||||
DestinationFilter filter=DestinationFilter.parseFilter(destination);
|
||||
ActiveMQDestination[] dests = excludedDestinations;
|
||||
if(dests!=null&&dests.length>0){
|
||||
|
@ -863,4 +873,12 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
|||
this.userName = userName;
|
||||
}
|
||||
|
||||
public boolean isBridgeTempDestinations() {
|
||||
return bridgeTempDestinations;
|
||||
}
|
||||
|
||||
public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
|
||||
this.bridgeTempDestinations = bridgeTempDestinations;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -55,6 +55,8 @@ public abstract class NetworkConnector extends ServiceSupport {
|
|||
private boolean dispatchAsync = true;
|
||||
private String userName;
|
||||
private String password;
|
||||
private boolean bridgeTempDestinations=false;
|
||||
|
||||
protected ConnectionFilter connectionFilter;
|
||||
|
||||
public NetworkConnector() {
|
||||
|
@ -257,6 +259,8 @@ public abstract class NetworkConnector extends ServiceSupport {
|
|||
destsList = getStaticallyIncludedDestinations();
|
||||
dests = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]);
|
||||
result.setStaticallyIncludedDestinations(dests);
|
||||
|
||||
result.setBridgeTempDestinations(bridgeTempDestinations);
|
||||
|
||||
if (durableDestinations != null) {
|
||||
ActiveMQDestination[] dest = new ActiveMQDestination[durableDestinations.size()];
|
||||
|
@ -322,4 +326,12 @@ public abstract class NetworkConnector extends ServiceSupport {
|
|||
public void setUserName(String userName) {
|
||||
this.userName = userName;
|
||||
}
|
||||
|
||||
public boolean isBridgeTempDestinations() {
|
||||
return bridgeTempDestinations;
|
||||
}
|
||||
|
||||
public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
|
||||
this.bridgeTempDestinations = bridgeTempDestinations;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,8 +62,8 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
|
|||
protected boolean persistentDelivery = true;
|
||||
protected boolean verbose = false;
|
||||
|
||||
protected void bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
|
||||
bridgeBrokers(localBrokerName,remoteBrokerName,false,1);
|
||||
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
|
||||
return bridgeBrokers(localBrokerName,remoteBrokerName,false,1);
|
||||
}
|
||||
|
||||
|
||||
|
@ -74,18 +74,18 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
|
|||
bridgeBrokers(localBroker, remoteBroker,dynamicOnly,1);
|
||||
}
|
||||
|
||||
protected void bridgeBrokers(String localBrokerName, String remoteBrokerName,boolean dynamicOnly, int networkTTL) throws Exception {
|
||||
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName,boolean dynamicOnly, int networkTTL) throws Exception {
|
||||
BrokerService localBroker = ((BrokerItem)brokers.get(localBrokerName)).broker;
|
||||
BrokerService remoteBroker = ((BrokerItem)brokers.get(remoteBrokerName)).broker;
|
||||
|
||||
bridgeBrokers(localBroker, remoteBroker,dynamicOnly,networkTTL);
|
||||
return bridgeBrokers(localBroker, remoteBroker,dynamicOnly,networkTTL);
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Overwrite this method to specify how you want to bridge the two brokers
|
||||
// By default, bridge them using add network connector of the local broker and the first connector of the remote broker
|
||||
protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,boolean dynamicOnly, int networkTTL) throws Exception {
|
||||
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,boolean dynamicOnly, int networkTTL) throws Exception {
|
||||
List transportConnectors = remoteBroker.getTransportConnectors();
|
||||
URI remoteURI;
|
||||
if (!transportConnectors.isEmpty()) {
|
||||
|
@ -94,11 +94,12 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
|
|||
connector.setDynamicOnly(dynamicOnly);
|
||||
connector.setNetworkTTL(networkTTL);
|
||||
localBroker.addNetworkConnector(connector);
|
||||
MAX_SETUP_TIME = 2000;
|
||||
return connector;
|
||||
} else {
|
||||
throw new Exception("Remote broker has no registered connectors.");
|
||||
}
|
||||
|
||||
MAX_SETUP_TIME = 2000;
|
||||
}
|
||||
|
||||
// This will interconnect all brokes using multicast
|
||||
|
|
|
@ -19,15 +19,19 @@ package org.apache.activemq.usecases;
|
|||
|
||||
import java.net.URI;
|
||||
import java.util.Iterator;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TemporaryQueue;
|
||||
|
||||
import org.apache.activemq.JmsMultipleBrokersTestSupport;
|
||||
import org.apache.activemq.network.NetworkConnector;
|
||||
/**
|
||||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
public class ThreeBrokerTempQueueNetworkTest extends JmsMultipleBrokersTestSupport{
|
||||
protected static final int MESSAGE_COUNT=100;
|
||||
boolean enableTempDestinationBridging = true;
|
||||
|
||||
/**
|
||||
* BrokerA -> BrokerB -> BrokerC
|
||||
|
@ -88,6 +92,17 @@ public class ThreeBrokerTempQueueNetworkTest extends JmsMultipleBrokersTestSuppo
|
|||
.getTemporaryQueues().length);
|
||||
}
|
||||
}
|
||||
|
||||
public void testTempDisable() throws Exception{
|
||||
enableTempDestinationBridging=false;
|
||||
try {
|
||||
testTempQueueCleanup();
|
||||
} catch (Throwable e) {
|
||||
// Expecting an error
|
||||
return;
|
||||
}
|
||||
fail("Test should have failed since temp queues are disabled.");
|
||||
}
|
||||
|
||||
public void setUp() throws Exception{
|
||||
super.setAutoFail(true);
|
||||
|
@ -96,4 +111,11 @@ public class ThreeBrokerTempQueueNetworkTest extends JmsMultipleBrokersTestSuppo
|
|||
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=true"));
|
||||
createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=true"));
|
||||
}
|
||||
|
||||
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception {
|
||||
NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName, dynamicOnly, networkTTL);
|
||||
connector.setBridgeTempDestinations(enableTempDestinationBridging);
|
||||
return connector;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue