From b32ff4dd5ce8ee3fab764cf290d27bd57c9213ed Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Sun, 16 Mar 2008 16:30:55 +0000 Subject: [PATCH] Ensure we detect Connection splits for any type of network git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@637609 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/cluster/ConnectionSplitBroker.java | 48 +++++++++---------- .../activemq/network/ConduitBridge.java | 2 + .../DemandForwardingBridgeSupport.java | 6 ++- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java index cd7a065c1e..8ec9c35543 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java @@ -44,39 +44,39 @@ public class ConnectionSplitBroker extends BrokerFilter{ } - public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception{ + public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) + throws Exception { ActiveMQDestination dest = info.getDestination(); - boolean validDestination = dest != null && !dest.isTemporary(); - if (validDestination) { - synchronized (networkConsumerList) { - if (info.isNetworkSubscription()) { - networkConsumerList.add(info); - } else { - if(!networkConsumerList.isEmpty()) { - List gcList = new ArrayList(); - for (ConsumerInfo nc : networkConsumerList) { - if (!nc.isNetworkConsumersEmpty()) { - for (ConsumerId id : nc.getNetworkConsumerIds()) { - if (id.equals(info.getConsumerId())) { - nc.removeNetworkConsumerId(id); - if (nc.isNetworkConsumersEmpty()) { - gcList.add(nc); - } + + synchronized (networkConsumerList) { + if (info.isNetworkSubscription()) { + networkConsumerList.add(info); + } else { + if (!networkConsumerList.isEmpty()) { + List gcList = new ArrayList(); + for (ConsumerInfo nc : networkConsumerList) { + if (!nc.isNetworkConsumersEmpty()) { + + for (ConsumerId id : nc.getNetworkConsumerIds()) { + + if (id.equals(info.getConsumerId())) { + nc.removeNetworkConsumerId(id); + if (nc.isNetworkConsumersEmpty()) { + gcList.add(nc); } } - } else { - gcList.add(nc); } } - for (ConsumerInfo nc : gcList) { - networkConsumerList.remove(nc); - super.removeConsumer(context, nc); - LOG.warn("Removed stale network consumer " + nc); - } + } + for (ConsumerInfo nc : gcList) { + networkConsumerList.remove(nc); + super.removeConsumer(context, nc); + LOG.warn("Removed stale network consumer " + nc); } } } } + return super.addConsumer(context, info); } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java index d0fefa9942..b74fe844ea 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -51,6 +51,8 @@ public class ConduitBridge extends DemandForwardingBridge { if (addToAlreadyInterestedConsumers(info)) { return null; // don't want this subscription added } + //add our original id to ourselves + info.addNetworkConsumerId(info.getConsumerId()); return doCreateDemandSubscription(info); } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index bf7327ab50..6ebb177742 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -820,11 +820,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { } return result; } - + protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { + //add our original id to ourselves + info.addNetworkConsumerId(info.getConsumerId()); return doCreateDemandSubscription(info); } + protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { DemandSubscription result = new DemandSubscription(info); result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); @@ -905,6 +908,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException; protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException; + protected abstract BrokerId[] getRemoteBrokerPath();