mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-2327 - fix regression in org.apache.activemq.usecases.TwoBrokerTopicSendReceiveLotsOfMessagesUsingTcpTest - statically included and dynaically included topic of same name. Need to conduit statically included (local networkSubs)
This commit is contained in:
parent
f609c50fd0
commit
a80a1857d0
|
@ -64,7 +64,7 @@ public class ConduitBridge extends DemandForwardingBridge {
|
|||
|
||||
for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
|
||||
DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
|
||||
if (!ds.getRemoteInfo().isNetworkSubscription() && filter.matches(info.getDestination())) {
|
||||
if (canConduit(ds) && filter.matches(info.getDestination())) {
|
||||
LOG.debug("{} {} with ids {} matched (add interest) {}", new Object[]{
|
||||
configuration.getBrokerName(), info, info.getNetworkConsumerIds(), ds
|
||||
});
|
||||
|
@ -81,6 +81,12 @@ public class ConduitBridge extends DemandForwardingBridge {
|
|||
return matched;
|
||||
}
|
||||
|
||||
// we want to conduit statically included consumers which are local networkSubs
|
||||
// but we don't want to conduit remote network subs i.e. (proxy proxy) consumers
|
||||
private boolean canConduit(DemandSubscription ds) {
|
||||
return ds.isStaticallyIncluded() || !ds.getRemoteInfo().isNetworkSubscription();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void removeDemandSubscription(ConsumerId id) throws IOException {
|
||||
List<DemandSubscription> tmpList = new ArrayList<DemandSubscription>();
|
||||
|
|
|
@ -1103,6 +1103,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
if (dests != null) {
|
||||
for (ActiveMQDestination dest : dests) {
|
||||
DemandSubscription sub = createDemandSubscription(dest);
|
||||
sub.setStaticallyIncluded(true);
|
||||
try {
|
||||
addSubscription(sub);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -44,6 +44,7 @@ public class DemandSubscription {
|
|||
private SubscriptionInfo localDurableSubscriber;
|
||||
|
||||
private NetworkBridgeFilter networkBridgeFilter;
|
||||
private boolean staticallyIncluded;
|
||||
|
||||
DemandSubscription(ConsumerInfo info) {
|
||||
remoteInfo = info;
|
||||
|
@ -155,4 +156,12 @@ public class DemandSubscription {
|
|||
public void setLocalDurableSubscriber(SubscriptionInfo localDurableSubscriber) {
|
||||
this.localDurableSubscriber = localDurableSubscriber;
|
||||
}
|
||||
|
||||
public boolean isStaticallyIncluded() {
|
||||
return staticallyIncluded;
|
||||
}
|
||||
|
||||
public void setStaticallyIncluded(boolean staticallyIncluded) {
|
||||
this.staticallyIncluded = staticallyIncluded;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue