diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 95eb6cb0de..82282706c6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.Collection; +import java.util.List; import javax.jms.ResourceAllocationException; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; @@ -89,6 +90,7 @@ public abstract class BaseDestination implements Destination { private boolean prioritizedMessages; private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; private boolean gcIfInactive; + private boolean gcWithNetworkConsumers; private long lastActiveTime=0l; private boolean reduceMemoryFootprint = false; @@ -243,7 +245,12 @@ public abstract class BaseDestination implements Destination { } public boolean isActive() { - return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0; + boolean isActive = destinationStatistics.getConsumers().getCount() != 0 || + destinationStatistics.getProducers().getCount() != 0; + if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) { + isActive = hasRegularConsumers(getConsumers()); + } + return isActive; } public int getMaxPageSize() { @@ -650,7 +657,19 @@ public abstract class BaseDestination implements Destination { public void setGcIfInactive(boolean gcIfInactive) { this.gcIfInactive = gcIfInactive; } - + + /** + * Indicate if it is ok to gc destinations that have only network consumers + * @param gcWithNetworkConsumers + */ + public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { + this.gcWithNetworkConsumers = gcWithNetworkConsumers; + } + + public boolean isGcWithNetworkConsumers() { + return gcWithNetworkConsumers; + } + public void markForGC(long timeStamp) { if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) { @@ -676,7 +695,9 @@ public abstract class BaseDestination implements Destination { return this.reduceMemoryFootprint; } - protected boolean hasRegularConsumers(Collection consumers) { + public abstract List getConsumers(); + + protected boolean hasRegularConsumers(List consumers) { boolean hasRegularConsumers = false; for (Subscription subscription: consumers) { if (!subscription.getConsumerInfo().isNetworkSubscription()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java index d6c3714766..dfbdf86ded 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java @@ -90,15 +90,4 @@ public class TempQueue extends Queue{ } super.dispose(context); } - - @Override - public boolean isActive() { - boolean isActive = super.isActive(); - if (isActive && brokerService.isAllowTempAutoCreationOnSend()) { - synchronized (consumers) { - isActive = hasRegularConsumers(consumers); - } - } - return isActive; - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java index f1de77f6e8..746eea0680 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java @@ -68,15 +68,4 @@ public class TempTopic extends Topic implements Task{ public void initialize() { } - - @Override - public boolean isActive() { - boolean isActive = super.isActive(); - if (isActive && brokerService.isAllowTempAutoCreationOnSend()) { - synchronized (consumers) { - isActive = hasRegularConsumers(consumers); - } - } - return isActive; - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index adace80d63..9155ce4575 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -90,10 +90,11 @@ public class PolicyEntry extends DestinationMapEntry { private boolean prioritizedMessages; private boolean allConsumersExclusiveByDefault; private boolean gcInactiveDestinations; + private boolean gcWithNetworkConsumers; private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; private boolean reduceMemoryFootprint; - - + + public void configure(Broker broker,Queue queue) { baseConfiguration(broker,queue); if (dispatchPolicy != null) { @@ -163,6 +164,7 @@ public class PolicyEntry extends DestinationMapEntry { destination.setSlowConsumerStrategy(scs); destination.setPrioritizedMessages(isPrioritizedMessages()); destination.setGcIfInactive(isGcInactiveDestinations()); + destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC()); destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); } @@ -787,7 +789,15 @@ public class PolicyEntry extends DestinationMapEntry { public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) { this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC; } - + + public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { + this.gcWithNetworkConsumers = gcWithNetworkConsumers; + } + + public boolean isGcWithNetworkConsumers() { + return gcWithNetworkConsumers; + } + public boolean isReduceMemoryFootprint() { return reduceMemoryFootprint; } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java index 24cbf1db30..dc3432514e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java @@ -264,6 +264,7 @@ public class RequestReplyNoAdvisoryNetworkTest extends JmsMultipleBrokersTestSup PolicyEntry tempReplyQPolicy = new PolicyEntry(); tempReplyQPolicy.setOptimizedDispatch(true); tempReplyQPolicy.setGcInactiveDestinations(true); + tempReplyQPolicy.setGcWithNetworkConsumers(true); tempReplyQPolicy.setInactiveTimoutBeforeGC(10*1000); map.put(replyQWildcard, tempReplyQPolicy); broker.setDestinationPolicy(map);