fix for broker name not being registered with the network connector

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@520814 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-03-21 09:18:56 +00:00
parent 9f027a6254
commit 54114ccf77
20 changed files with 102 additions and 89 deletions

View File

@ -1084,11 +1084,14 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
try{
Properties props = MarshallingSupport.stringToProperties(info.getNetworkProperties());
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config,props,null);
config.setLocalBrokerName(broker.getBrokerName());
IntrospectionSupport.setProperties(config,props,"");
config.setBrokerName(broker.getBrokerName());
Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI());
localTransport.start();
duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,transport);
//now turn duplex off this side
duplexBridge.setCreatedByDuplex(true);
duplexBridge.start();
log.info("Created Duplex Bridge back to " + info.getBrokerName());
}catch(Exception e){
log.error("Creating duplex network bridge",e);
}
@ -1096,6 +1099,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
// We only expect to get one broker info command per connection
if(this.brokerInfo!=null){
log.warn("Unexpected extra broker info command received: "+info);
Thread.dumpStack();
}
this.brokerInfo=info;
broker.addBroker(this,info);

View File

@ -84,7 +84,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
protected ProducerInfo producerInfo;
protected String remoteBrokerName = "Unknown";
protected String localClientId;
protected String name = "bridge";
protected ConsumerInfo demandConsumerInfo;
protected int demandConsumerDispatched;
protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
@ -104,6 +103,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
private NetworkBridgeFailedListener bridgeFailedListener;
private boolean createdByDuplex;
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
@ -230,7 +230,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
localConnectionInfo=new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId="NC_"+remoteBrokerName+"_inbound"+name;
localClientId="NC_"+remoteBrokerName+"_inbound"+configuration.getBrokerName();
localConnectionInfo.setClientId(localClientId);
localConnectionInfo.setUserName(configuration.getUserName());
localConnectionInfo.setPassword(configuration.getPassword());
@ -259,21 +259,23 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
remoteConnectionInfo=new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId("NC_"+configuration.getLocalBrokerName()+"_outbound"+name);
remoteConnectionInfo.setClientId("NC_"+configuration.getBrokerName()+"_outbound");
remoteConnectionInfo.setUserName(configuration.getUserName());
remoteConnectionInfo.setPassword(configuration.getPassword());
remoteBroker.oneway(remoteConnectionInfo);
if (isCreatedByDuplex()==false) {
BrokerInfo brokerInfo=new BrokerInfo();
brokerInfo.setBrokerName(configuration.getLocalBrokerName());
brokerInfo.setBrokerName(configuration.getBrokerName());
brokerInfo.setNetworkConnection(true);
brokerInfo.setDuplexConnection(configuration.isDuplex());
//set our properties
Properties props = new Properties();
IntrospectionSupport.getProperties(this,props,null);
String str = MarshallingSupport.propertiesToString(props);
brokerInfo.setNetworkProperties(str);
brokerInfo.setNetworkProperties(str);
remoteBroker.oneway(brokerInfo);
}
SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
remoteBroker.oneway(remoteSessionInfo);
@ -303,7 +305,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
}
public void stop() throws Exception{
log.debug(" stopping "+configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
boolean wasDisposedAlready=disposed;
if(!disposed){
try{
@ -321,9 +323,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
}
}
if(wasDisposedAlready){
log.debug(configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" stopped");
log.debug(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
}else{
log.info(configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" stopped");
log.info(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
}
}
@ -386,6 +388,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
}
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
final int networkTTL = configuration.getNetworkTTL();
if(data.getClass()==ConsumerInfo.class){
// Create a new local subscription
@ -393,19 +396,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
BrokerId[] path=info.getBrokerPath();
if((path!=null&&path.length>= networkTTL)){
if(log.isDebugEnabled())
log.debug(configuration.getLocalBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
log.debug(configuration.getBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
return;
}
if(contains(info.getBrokerPath(),localBrokerPath[0])){
// Ignore this consumer as it's a consumer we locally sent to the broker.
if(log.isDebugEnabled())
log.debug(configuration.getLocalBrokerName() + " Ignoring sub " + info + " already routed through this broker once");
log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already routed through this broker once");
return;
}
if (!isPermissableDestination(info.getDestination())){
//ignore if not in the permited or in the excluded list
if(log.isDebugEnabled())
log.debug(configuration.getLocalBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
return;
}
// Update the packet to show where it came from.
@ -415,10 +418,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
if (sub != null){
addSubscription(sub);
if(log.isDebugEnabled())
log.debug(configuration.getLocalBrokerName() + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info);
log.debug(configuration.getBrokerName() + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info);
}else {
if(log.isDebugEnabled())
log.debug(configuration.getLocalBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination");
log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination");
}
}
else if (data.getClass()==DestinationInfo.class){
@ -510,7 +513,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
if(sub!=null){
Message message= configureMessage(md);
if(trace)
log.trace("bridging "+configuration.getLocalBrokerName()+" -> "+remoteBrokerName+": "+message);
log.trace("bridging "+configuration.getBrokerName()+" -> "+remoteBrokerName+": "+message);
@ -550,7 +553,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
}else if(command.isBrokerInfo()){
serviceLocalBrokerInfo(command);
}else if(command.isShutdownInfo()){
log.info(configuration.getLocalBrokerName()+" Shutting down");
log.info(configuration.getBrokerName()+" Shutting down");
// Don't shut down the whole connector if the remote side was interrupted.
// the local transport is just shutting down temporarily until the remote side
// is restored.
@ -643,19 +646,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
public Transport getRemoteBroker() {
return remoteBroker;
}
/**
* @return Returns the name.
* @return the createdByDuplex
*/
public String getName() {
return name;
public boolean isCreatedByDuplex(){
return this.createdByDuplex;
}
/**
* @param name The name to set.
* @param createdByDuplex the createdByDuplex to set
*/
public void setName(String name) {
this.name=name;
public void setCreatedByDuplex(boolean createdByDuplex){
this.createdByDuplex=createdByDuplex;
}
public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
@ -687,10 +690,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
}
protected boolean isPermissableDestination(ActiveMQDestination destination) {
// Are we not bridging temp destinations?
if( destination.isTemporary() && !configuration.isBridgeTempDestinations() )
if( destination.isTemporary() && !configuration.isBridgeTempDestinations() ) {
return false;
}
DestinationFilter filter=DestinationFilter.parseFilter(destination);
ActiveMQDestination[] dests = excludedDestinations;
@ -767,7 +770,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
.getNextSequenceId()));
DemandSubscription result=new DemandSubscription(info);
result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
return result;
}
@ -820,5 +822,4 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
l.bridgeFailed();
}
}
}

View File

@ -84,7 +84,6 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
URI connectUri = uri;
log.info("Establishing network connection between from " + localURI + " to " + connectUri);
Transport remoteTransport;
try {
remoteTransport = TransportFactory.connect(connectUri);
@ -204,7 +203,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return configureBridge(result);
}
protected String createName() {
public String getName() {
return discoveryAgent.toString();
}

View File

@ -93,7 +93,7 @@ public class DurableConduitBridge extends ConduitBridge{
}
protected String getSubscriberName(ActiveMQDestination dest){
String subscriberName = configuration.getLocalBrokerName()+"_"+dest.getPhysicalName();
String subscriberName = configuration.getBrokerName()+"_"+dest.getPhysicalName();
return subscriberName;
}

View File

@ -145,7 +145,7 @@ public class MulticastNetworkConnector extends NetworkConnector {
}
}
protected String createName() {
public String getName() {
return remoteTransport.toString();
}

View File

@ -29,7 +29,7 @@ public class NetworkBridgeConfiguration{
private boolean bridgeTempDestinations=true;
private int prefetchSize=1000;
private int networkTTL=1;
private String localBrokerName="Unknow";
private String brokerName="localhost";
private String userName;
private String password;
private String destinationFilter = ">";
@ -128,18 +128,18 @@ public class NetworkBridgeConfiguration{
/**
* @return the localBrokerName
* @return the brokerName
*/
public String getLocalBrokerName(){
return this.localBrokerName;
public String getBrokerName(){
return this.brokerName;
}
/**
* @param localBrokerName the localBrokerName to set
* @param brokerName the localBrokerName to set
*/
public void setLocalBrokerName(String localBrokerName){
this.localBrokerName=localBrokerName;
public void setBrokerName(String brokerName){
this.brokerName=brokerName;
}

View File

@ -14,7 +14,6 @@
package org.apache.activemq.network;
import static org.apache.activemq.network.NetworkConnector.log;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
@ -36,12 +35,10 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
protected static final Log log=LogFactory.getLog(NetworkConnector.class);
protected URI localURI;
private String brokerName="localhost";
private Set durableDestinations;
private List excludedDestinations=new CopyOnWriteArrayList();
private List dynamicallyIncludedDestinations=new CopyOnWriteArrayList();
private List staticallyIncludedDestinations=new CopyOnWriteArrayList();
private String name="bridge";
protected ConnectionFilter connectionFilter;
protected ServiceSupport serviceSupport=new ServiceSupport(){
@ -69,34 +66,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
this.localURI=localURI;
}
/**
* @return Returns the name.
*/
public String getName(){
if(name==null){
name=createName();
}
return name;
}
/**
* @param name The name to set.
*/
public void setName(String name){
this.name=name;
}
public String getBrokerName(){
return brokerName;
}
/**
* @param brokerName The brokerName to set.
*/
public void setBrokerName(String brokerName){
this.brokerName=brokerName;
}
/**
* @return Returns the durableDestinations.
*/
@ -177,7 +147,6 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
// Implementation methods
// -------------------------------------------------------------------------
protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result){
result.setName(getBrokerName());
List destsList=getDynamicallyIncludedDestinations();
ActiveMQDestination dests[]=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setDynamicallyIncludedDestinations(dests);
@ -195,8 +164,6 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
return result;
}
protected abstract String createName();
protected Transport createLocalTransport() throws Exception{
return TransportFactory.connect(localURI);
}
@ -209,6 +176,8 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
serviceSupport.stop();
}
public abstract String getName();
protected void handleStart() throws Exception{
if(localURI==null){
throw new IllegalStateException("You must configure the 'localURI' property");

View File

@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class SimpleDiscoveryAgent implements DiscoveryAgent {
private long initialReconnectDelay = 1000*5;
private long initialReconnectDelay = 1000;
private long maxReconnectDelay = 1000 * 30;
private long backOffMultiplier = 2;
private boolean useExponentialBackOff = false;

View File

@ -256,7 +256,6 @@ public class AMQDeadlockTest3 extends TestCase {
final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
nc.setBridgeTempDestinations(true);
nc.setBrokerName(brokerName);
nc.setName(brokerName + ".nc");
brokerService.addNetworkConnector(nc);
}

View File

@ -126,7 +126,7 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
protected void setUp() throws Exception {
super.setUp();
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setLocalBrokerName("local");
config.setBrokerName("local");
config.setDispatchAsync(false);
bridge = new DemandForwardingBridge(config,createTransport(), createRemoteTransport());
bridge.start();

View File

@ -217,8 +217,7 @@ public class AMQDeadlockTestW4Brokers extends TestCase {
"static:" + uri2 + "," + uri3 + "," + uri4));
nc.setBridgeTempDestinations(true);
nc.setBrokerName(brokerName);
nc.setName(brokerName + ".nc");
// When using queue limits set this to 1
nc.setPrefetchSize(1000);
nc.setNetworkTTL(1);

View File

@ -123,7 +123,6 @@ public class AMQFailoverIssue extends TestCase{
final NetworkConnector nc=new DiscoveryNetworkConnector(new URI("static:"+uri2));
nc.setBridgeTempDestinations(true);
nc.setBrokerName(brokerName);
nc.setName(brokerName+".nc");
nc.setPrefetchSize(1);
brokerService.addNetworkConnector(nc);
}

View File

@ -145,7 +145,6 @@ public class AMQStackOverFlowTest extends TestCase {
"static:" + uri2));
nc.setBridgeTempDestinations(true);
nc.setBrokerName(brokerName);
nc.setName(brokerName + ".nc");
nc.setPrefetchSize(1);
brokerService.addNetworkConnector(nc);
}

View File

@ -58,7 +58,7 @@ public class MultiBrokersMultiClientsUsingTcpTest extends MultiBrokersMultiClien
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setLocalBrokerName(localBroker.getBrokerName());
config.setBrokerName(localBroker.getBrokerName());
DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI));
bridges.add(bridge);

View File

@ -45,7 +45,7 @@ public class ThreeBrokerQueueNetworkUsingTcpTest extends ThreeBrokerQueueNetwork
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setLocalBrokerName(localBroker.getBrokerName());
config.setBrokerName(localBroker.getBrokerName());
DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI));
bridges.add(bridge);

View File

@ -45,7 +45,7 @@ public class ThreeBrokerTopicNetworkUsingTcpTest extends ThreeBrokerTopicNetwork
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setLocalBrokerName(localBroker.getBrokerName());
config.setBrokerName(localBroker.getBrokerName());
DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI));
bridges.add(bridge);

View File

@ -117,7 +117,7 @@ public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultip
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setLocalBrokerName(localBroker.getBrokerName());
config.setBrokerName(localBroker.getBrokerName());
DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI)) {
protected void serviceLocalCommand(Command command) {

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="localBroker" persistent="true" useShutdownHook="false" xmlns="http://activemq.org/config/1.0">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61617)">
duplex = true
dynamicOnly = false
conduitSubscriptions = true
decreaseNetworkConsumerPriority = false
<excludedDestinations>
<queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/>
</excludedDestinations>
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
</networkConnector>
</networkConnectors>
</broker>
</beans>

View File

@ -24,7 +24,7 @@
</transportConnectors>
<networkConnectors>
<networkConnector uri="static:failover:(tcp://localhost:61617)">
<networkConnector uri="static:(tcp://localhost:61617)">
dynamicOnly = false
conduitSubscriptions = true
decreaseNetworkConsumerPriority = false

View File

@ -23,7 +23,7 @@
<transportConnector uri="tcp://localhost:61617"/>
</transportConnectors>
<networkConnectors>
<networkConnector uri="static:failover:(tcp://localhost:61616)"/>
<networkConnector uri="static:(tcp://localhost:61616)" />
</networkConnectors>
</broker>