diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index f99740294d..761952bd15 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -270,7 +270,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us public synchronized String toString() { - return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending=" + return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 36c1e09aa4..386ef59657 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -1048,7 +1048,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br List networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); if (!networkConsumers.isEmpty()) { if (matchFound(candidateConsumers, networkConsumers)) { - suppress = isActiveDurableSub(sub) && hasLowerPriority(sub, candidate.getLocalInfo()); + if (isInActiveDurableSub(sub)) { + suppress = false; + } else { + suppress = hasLowerPriority(sub, candidate.getLocalInfo()); + } break; } } @@ -1056,8 +1060,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return suppress; } - private boolean isActiveDurableSub(Subscription sub) { - return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && ((DurableTopicSubscription)sub).isActive()); + private boolean isInActiveDurableSub(Subscription sub) { + return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isActive()); } private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { @@ -1067,7 +1071,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " - + existingSub.getConsumerInfo() + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds()); + + existingSub + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds()); } suppress = true; } else { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java index d52ac8f2e8..ffff3f31df 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java @@ -117,7 +117,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { private BrokerService createAndStartBroker(String name, String addr) throws Exception { BrokerService broker = new BrokerService(); - //broker.setDeleteAllMessagesOnStartup(true); + broker.setDeleteAllMessagesOnStartup(true); broker.setBrokerName(name); broker.addConnector(addr).setDiscoveryUri(new URI(MULTICAST_DEFAULT)); broker.setUseJmx(false);