mirror of https://github.com/apache/activemq.git
resolve https://issues.apache.org/activemq/browse/AMQ-2448 - failed network bridge being incorrectly recorded in a list (and leaked), issue introduced by https://issues.apache.org/activemq/browse/AMQ-2298 now resolved by reusing exsiting correctly managed list of bridges
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@824807 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a84bbc1de8
commit
43bd686a2c
|
@ -48,7 +48,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
|
|||
private static final Log LOG = LogFactory.getLog(DiscoveryNetworkConnector.class);
|
||||
|
||||
private DiscoveryAgent discoveryAgent;
|
||||
private ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
|
||||
|
||||
private Map<String, String> parameters;
|
||||
|
||||
public DiscoveryNetworkConnector() {
|
||||
|
|
|
@ -16,6 +16,19 @@
|
|||
*/
|
||||
package org.apache.activemq.network;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.activemq.Service;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.AnnotatedMBean;
|
||||
|
@ -30,17 +43,6 @@ import org.apache.activemq.util.ServiceStopper;
|
|||
import org.apache.activemq.util.ServiceSupport;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
/**
|
||||
* @version $Revision$
|
||||
|
@ -50,6 +52,8 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
|||
private static final Log LOG = LogFactory.getLog(NetworkConnector.class);
|
||||
protected URI localURI;
|
||||
protected ConnectionFilter connectionFilter;
|
||||
protected ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
|
||||
|
||||
protected ServiceSupport serviceSupport = new ServiceSupport() {
|
||||
|
||||
protected void doStart() throws Exception {
|
||||
|
@ -67,8 +71,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
|||
private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
|
||||
private BrokerService brokerService;
|
||||
private ObjectName objectName;
|
||||
private ConcurrentLinkedQueue<DemandForwardingBridgeSupport> configuredBridges = new ConcurrentLinkedQueue<DemandForwardingBridgeSupport>();
|
||||
|
||||
|
||||
public NetworkConnector() {
|
||||
}
|
||||
|
||||
|
@ -187,7 +190,6 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
|||
dest = (ActiveMQDestination[])topics.toArray(dest);
|
||||
result.setDurableDestinations(dest);
|
||||
}
|
||||
configuredBridges.add(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -268,10 +270,13 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
|||
// ask all the bridges as we can't know to which this consumer is tied
|
||||
public boolean removeDemandSubscription(ConsumerId consumerId) {
|
||||
boolean removeSucceeded = false;
|
||||
for (DemandForwardingBridgeSupport bridge: configuredBridges) {
|
||||
if (bridge.removeDemandSubscriptionByLocalId(consumerId)) {
|
||||
removeSucceeded = true;
|
||||
break;
|
||||
for (NetworkBridge bridge : bridges.values()) {
|
||||
if (bridge instanceof DemandForwardingBridgeSupport) {
|
||||
DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge;
|
||||
if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) {
|
||||
removeSucceeded = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return removeSucceeded;
|
||||
|
|
Loading…
Reference in New Issue