Ensure we detect Connection splits for any type of network

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@637609 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-03-16 16:30:55 +00:00
parent 1987cfa234
commit b32ff4dd5c
3 changed files with 31 additions and 25 deletions

View File

@ -44,39 +44,39 @@ public class ConnectionSplitBroker extends BrokerFilter{
}
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception{
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception {
ActiveMQDestination dest = info.getDestination();
boolean validDestination = dest != null && !dest.isTemporary();
if (validDestination) {
synchronized (networkConsumerList) {
if (info.isNetworkSubscription()) {
networkConsumerList.add(info);
} else {
if(!networkConsumerList.isEmpty()) {
List<ConsumerInfo> gcList = new ArrayList<ConsumerInfo>();
for (ConsumerInfo nc : networkConsumerList) {
if (!nc.isNetworkConsumersEmpty()) {
for (ConsumerId id : nc.getNetworkConsumerIds()) {
if (id.equals(info.getConsumerId())) {
nc.removeNetworkConsumerId(id);
if (nc.isNetworkConsumersEmpty()) {
gcList.add(nc);
}
synchronized (networkConsumerList) {
if (info.isNetworkSubscription()) {
networkConsumerList.add(info);
} else {
if (!networkConsumerList.isEmpty()) {
List<ConsumerInfo> gcList = new ArrayList<ConsumerInfo>();
for (ConsumerInfo nc : networkConsumerList) {
if (!nc.isNetworkConsumersEmpty()) {
for (ConsumerId id : nc.getNetworkConsumerIds()) {
if (id.equals(info.getConsumerId())) {
nc.removeNetworkConsumerId(id);
if (nc.isNetworkConsumersEmpty()) {
gcList.add(nc);
}
}
} else {
gcList.add(nc);
}
}
for (ConsumerInfo nc : gcList) {
networkConsumerList.remove(nc);
super.removeConsumer(context, nc);
LOG.warn("Removed stale network consumer " + nc);
}
}
for (ConsumerInfo nc : gcList) {
networkConsumerList.remove(nc);
super.removeConsumer(context, nc);
LOG.warn("Removed stale network consumer " + nc);
}
}
}
}
return super.addConsumer(context, info);
}

View File

@ -51,6 +51,8 @@ public class ConduitBridge extends DemandForwardingBridge {
if (addToAlreadyInterestedConsumers(info)) {
return null; // don't want this subscription added
}
//add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
return doCreateDemandSubscription(info);
}

View File

@ -820,11 +820,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
}
return result;
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
//add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
return doCreateDemandSubscription(info);
}
protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
DemandSubscription result = new DemandSubscription(info);
result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
@ -905,6 +908,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException;
protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
protected abstract BrokerId[] getRemoteBrokerPath();