mirror of https://github.com/apache/activemq.git
fix for https://issues.apache.org/activemq/browse/AMQ-2327 - close proxy consumers
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@795069 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ed0b08bbd9
commit
e5584e6e78
|
@ -18,9 +18,11 @@ package org.apache.activemq.network;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.BrokerId;
|
||||||
import org.apache.activemq.command.ConsumerId;
|
import org.apache.activemq.command.ConsumerId;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
import org.apache.activemq.filter.DestinationFilter;
|
import org.apache.activemq.filter.DestinationFilter;
|
||||||
|
@ -55,6 +57,18 @@ public class ConduitBridge extends DemandForwardingBridge {
|
||||||
info.setSelector(null);
|
info.setSelector(null);
|
||||||
return doCreateDemandSubscription(info);
|
return doCreateDemandSubscription(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean checkPaths(BrokerId[] first, BrokerId[] second) {
|
||||||
|
if (first == null || second == null)
|
||||||
|
return true;
|
||||||
|
if (Arrays.equals(first, second))
|
||||||
|
return true;
|
||||||
|
if (first[0].equals(second[0])
|
||||||
|
&& first[first.length - 1].equals(second[second.length - 1]))
|
||||||
|
return false;
|
||||||
|
else
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
|
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
|
||||||
// search through existing subscriptions and see if we have a match
|
// search through existing subscriptions and see if we have a match
|
||||||
|
@ -62,6 +76,7 @@ public class ConduitBridge extends DemandForwardingBridge {
|
||||||
DestinationFilter filter = DestinationFilter.parseFilter(info.getDestination());
|
DestinationFilter filter = DestinationFilter.parseFilter(info.getDestination());
|
||||||
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
|
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
|
||||||
DemandSubscription ds = (DemandSubscription)i.next();
|
DemandSubscription ds = (DemandSubscription)i.next();
|
||||||
|
|
||||||
if (filter.matches(ds.getLocalInfo().getDestination())) {
|
if (filter.matches(ds.getLocalInfo().getDestination())) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo()
|
LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo()
|
||||||
|
@ -69,9 +84,10 @@ public class ConduitBridge extends DemandForwardingBridge {
|
||||||
}
|
}
|
||||||
// add the interest in the subscription
|
// add the interest in the subscription
|
||||||
// ds.add(ds.getRemoteInfo().getConsumerId());
|
// ds.add(ds.getRemoteInfo().getConsumerId());
|
||||||
ds.add(info.getConsumerId());
|
if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
|
||||||
|
ds.add(info.getConsumerId());
|
||||||
|
}
|
||||||
matched = true;
|
matched = true;
|
||||||
|
|
||||||
// continue - we want interest to any existing
|
// continue - we want interest to any existing
|
||||||
// DemandSubscriptions
|
// DemandSubscriptions
|
||||||
}
|
}
|
||||||
|
|
|
@ -553,6 +553,8 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
|
||||||
|
|
||||||
|
|
||||||
public void testDuplicateQueueSubs() throws Exception {
|
public void testDuplicateQueueSubs() throws Exception {
|
||||||
|
|
||||||
|
createBroker("BrokerD");
|
||||||
|
|
||||||
bridgeAllBrokers("default", 3, false);
|
bridgeAllBrokers("default", 3, false);
|
||||||
startAllBrokers();
|
startAllBrokers();
|
||||||
|
@ -575,7 +577,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
|
||||||
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
|
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
|
||||||
BrokerService broker = i.next().broker;
|
BrokerService broker = i.next().broker;
|
||||||
if (!brokerName.equals(broker.getBrokerName())) {
|
if (!brokerName.equals(broker.getBrokerName())) {
|
||||||
verifyConsumerCount(broker, 2, dest);
|
verifyConsumerCount(broker, 3, dest);
|
||||||
verifyConsumePriority(broker, ConsumerInfo.NORMAL_PRIORITY, dest);
|
verifyConsumePriority(broker, ConsumerInfo.NORMAL_PRIORITY, dest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue