The NetworkConnector was not removing the old bridge for the map of bridges that he was mantinaing,
so when the remote broker came back up, then the bridge was not restarted.

Also add a new ConnectionFilter interface which allows the broker to avoid establishing loopback connections.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@386507 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-17 02:24:29 +00:00
parent d77e665ed7
commit 47853d3584
4 changed files with 73 additions and 4 deletions

View File

@ -47,6 +47,7 @@ import org.apache.activemq.broker.jmx.ProxyConnectorView;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.ConnectionFilter;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.network.jms.JmsConnector;
@ -224,6 +225,24 @@ public class BrokerService implements Service {
map.put("network", "true");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
connector.setLocalUri(uri);
// Set a connection filter so that the connector does not establish loop back connections.
connector.setConnectionFilter(new ConnectionFilter() {
public boolean connectTo(URI location) {
List transportConnectors = getTransportConnectors();
for (Iterator iter = transportConnectors.iterator(); iter.hasNext();) {
try {
TransportConnector tc = (TransportConnector) iter.next();
if( location.equals(tc.getConnectUri()) ) {
return false;
}
} catch (Throwable e) {
}
}
return true;
}
});
networkConnectors.add(connector);
if (isUseJmx()) {
registerNetworkConnectorMBean(connector);

View File

@ -0,0 +1,16 @@
package org.apache.activemq.network;
import java.net.URI;
/**
* Abstraction that allows you to control which brokers a NetworkConnector connects bridges to.
*
* @version $Revision$
*/
public interface ConnectionFilter {
/**
* @param location
* @return true if the network connector should establish a connection to the specified location.
*/
boolean connectTo(URI location);
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.network;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
@ -44,7 +45,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
private DiscoveryAgent discoveryAgent;
private ConcurrentHashMap bridges = new ConcurrentHashMap();
public DiscoveryNetworkConnector() {
}
@ -69,8 +70,11 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return;
}
// Has it allready been added?
if (bridges.containsKey(uri) || localURI.equals(uri))
// Should we try to connect to that URI?
if ( bridges.containsKey(uri)
|| localURI.equals(uri)
|| (connectionFilter!=null && !connectionFilter.connectTo(uri))
)
return;
URI connectUri = uri;
@ -131,7 +135,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return;
}
Bridge bridge = (Bridge) bridges.get(uri);
Bridge bridge = (Bridge) bridges.remove(uri);
if (bridge == null)
return;

View File

@ -18,6 +18,7 @@ package org.apache.activemq.network;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
@ -50,6 +51,9 @@ public abstract class NetworkConnector extends ServiceSupport {
private boolean decreaseNetworkConsumerPriority;
private int networkTTL = 1;
private String name = "bridge";
private int prefetchSize = 1000;
private boolean dispatchAsync = true;
protected ConnectionFilter connectionFilter;
public NetworkConnector() {
}
@ -234,6 +238,8 @@ public abstract class NetworkConnector extends ServiceSupport {
result.setLocalBrokerName(getBrokerName());
result.setName(getBrokerName());
result.setNetworkTTL(getNetworkTTL());
result.setPrefetchSize(prefetchSize);
result.setDispatchAsync(dispatchAsync);
result.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
List destsList = getDynamicallyIncludedDestinations();
@ -272,4 +278,28 @@ public abstract class NetworkConnector extends ServiceSupport {
protected Transport createLocalTransport() throws Exception {
return TransportFactory.connect(localURI);
}
public boolean isDispatchAsync() {
return dispatchAsync;
}
public void setDispatchAsync(boolean dispatchAsync) {
this.dispatchAsync = dispatchAsync;
}
public int getPrefetchSize() {
return prefetchSize;
}
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
}
public ConnectionFilter getConnectionFilter() {
return connectionFilter;
}
public void setConnectionFilter(ConnectionFilter connectionFilter) {
this.connectionFilter = connectionFilter;
}
}