From 43bd686a2c651fea3f1db4ad2cd164140f26554e Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Tue, 13 Oct 2009 15:30:50 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2448 - failed network bridge being incorrectly recorded in a list (and leaked), issue introduced by https://issues.apache.org/activemq/browse/AMQ-2298 now resolved by reusing exsiting correctly managed list of bridges git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@824807 13f79535-47bb-0310-9956-ffa450edef68 --- .../network/DiscoveryNetworkConnector.java | 2 +- .../activemq/network/NetworkConnector.java | 41 +++++++++++-------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index 92f748b77e..9d856bd495 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -48,7 +48,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco private static final Log LOG = LogFactory.getLog(DiscoveryNetworkConnector.class); private DiscoveryAgent discoveryAgent; - private ConcurrentHashMap bridges = new ConcurrentHashMap(); + private Map parameters; public DiscoveryNetworkConnector() { diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java index 318ef1ad5b..275bd07316 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -16,6 +16,19 @@ */ package org.apache.activemq.network; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + import org.apache.activemq.Service; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.AnnotatedMBean; @@ -30,17 +43,6 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; /** * @version $Revision$ @@ -50,6 +52,8 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem private static final Log LOG = LogFactory.getLog(NetworkConnector.class); protected URI localURI; protected ConnectionFilter connectionFilter; + protected ConcurrentHashMap bridges = new ConcurrentHashMap(); + protected ServiceSupport serviceSupport = new ServiceSupport() { protected void doStart() throws Exception { @@ -67,8 +71,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem private List staticallyIncludedDestinations = new CopyOnWriteArrayList(); private BrokerService brokerService; private ObjectName objectName; - private ConcurrentLinkedQueue configuredBridges = new ConcurrentLinkedQueue(); - + public NetworkConnector() { } @@ -187,7 +190,6 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem dest = (ActiveMQDestination[])topics.toArray(dest); result.setDurableDestinations(dest); } - configuredBridges.add(result); return result; } @@ -268,10 +270,13 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem // ask all the bridges as we can't know to which this consumer is tied public boolean removeDemandSubscription(ConsumerId consumerId) { boolean removeSucceeded = false; - for (DemandForwardingBridgeSupport bridge: configuredBridges) { - if (bridge.removeDemandSubscriptionByLocalId(consumerId)) { - removeSucceeded = true; - break; + for (NetworkBridge bridge : bridges.values()) { + if (bridge instanceof DemandForwardingBridgeSupport) { + DemandForwardingBridgeSupport demandBridge = (DemandForwardingBridgeSupport) bridge; + if (demandBridge.removeDemandSubscriptionByLocalId(consumerId)) { + removeSucceeded = true; + break; + } } } return removeSucceeded;