mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3253 - Support Temporary Destinations in a network without advisories. Allow gc of regular destinations that have just network consumers, hub/spoke case with dynamic real reply destinations. policy entry: GcWithNetworkConsumers now applicable to all destinations. addition to https://issues.apache.org/jira/browse/AMQ-2571
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1088557 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
254d9209f5
commit
d55a3922c9
|
@ -18,6 +18,7 @@ package org.apache.activemq.broker.region;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import javax.jms.ResourceAllocationException;
|
import javax.jms.ResourceAllocationException;
|
||||||
import org.apache.activemq.advisory.AdvisorySupport;
|
import org.apache.activemq.advisory.AdvisorySupport;
|
||||||
import org.apache.activemq.broker.Broker;
|
import org.apache.activemq.broker.Broker;
|
||||||
|
@ -89,6 +90,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
private boolean prioritizedMessages;
|
private boolean prioritizedMessages;
|
||||||
private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
|
private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
|
||||||
private boolean gcIfInactive;
|
private boolean gcIfInactive;
|
||||||
|
private boolean gcWithNetworkConsumers;
|
||||||
private long lastActiveTime=0l;
|
private long lastActiveTime=0l;
|
||||||
private boolean reduceMemoryFootprint = false;
|
private boolean reduceMemoryFootprint = false;
|
||||||
|
|
||||||
|
@ -243,7 +245,12 @@ public abstract class BaseDestination implements Destination {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isActive() {
|
public boolean isActive() {
|
||||||
return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0;
|
boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
|
||||||
|
destinationStatistics.getProducers().getCount() != 0;
|
||||||
|
if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) {
|
||||||
|
isActive = hasRegularConsumers(getConsumers());
|
||||||
|
}
|
||||||
|
return isActive;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMaxPageSize() {
|
public int getMaxPageSize() {
|
||||||
|
@ -651,6 +658,18 @@ public abstract class BaseDestination implements Destination {
|
||||||
this.gcIfInactive = gcIfInactive;
|
this.gcIfInactive = gcIfInactive;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicate if it is ok to gc destinations that have only network consumers
|
||||||
|
* @param gcWithNetworkConsumers
|
||||||
|
*/
|
||||||
|
public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
|
||||||
|
this.gcWithNetworkConsumers = gcWithNetworkConsumers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isGcWithNetworkConsumers() {
|
||||||
|
return gcWithNetworkConsumers;
|
||||||
|
}
|
||||||
|
|
||||||
public void markForGC(long timeStamp) {
|
public void markForGC(long timeStamp) {
|
||||||
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
|
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
|
||||||
&& destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
|
&& destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
|
||||||
|
@ -676,7 +695,9 @@ public abstract class BaseDestination implements Destination {
|
||||||
return this.reduceMemoryFootprint;
|
return this.reduceMemoryFootprint;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean hasRegularConsumers(Collection<Subscription> consumers) {
|
public abstract List<Subscription> getConsumers();
|
||||||
|
|
||||||
|
protected boolean hasRegularConsumers(List<Subscription> consumers) {
|
||||||
boolean hasRegularConsumers = false;
|
boolean hasRegularConsumers = false;
|
||||||
for (Subscription subscription: consumers) {
|
for (Subscription subscription: consumers) {
|
||||||
if (!subscription.getConsumerInfo().isNetworkSubscription()) {
|
if (!subscription.getConsumerInfo().isNetworkSubscription()) {
|
||||||
|
|
|
@ -90,15 +90,4 @@ public class TempQueue extends Queue{
|
||||||
}
|
}
|
||||||
super.dispose(context);
|
super.dispose(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isActive() {
|
|
||||||
boolean isActive = super.isActive();
|
|
||||||
if (isActive && brokerService.isAllowTempAutoCreationOnSend()) {
|
|
||||||
synchronized (consumers) {
|
|
||||||
isActive = hasRegularConsumers(consumers);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return isActive;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,15 +68,4 @@ public class TempTopic extends Topic implements Task{
|
||||||
|
|
||||||
public void initialize() {
|
public void initialize() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isActive() {
|
|
||||||
boolean isActive = super.isActive();
|
|
||||||
if (isActive && brokerService.isAllowTempAutoCreationOnSend()) {
|
|
||||||
synchronized (consumers) {
|
|
||||||
isActive = hasRegularConsumers(consumers);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return isActive;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,6 +90,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
private boolean prioritizedMessages;
|
private boolean prioritizedMessages;
|
||||||
private boolean allConsumersExclusiveByDefault;
|
private boolean allConsumersExclusiveByDefault;
|
||||||
private boolean gcInactiveDestinations;
|
private boolean gcInactiveDestinations;
|
||||||
|
private boolean gcWithNetworkConsumers;
|
||||||
private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
|
private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
|
||||||
private boolean reduceMemoryFootprint;
|
private boolean reduceMemoryFootprint;
|
||||||
|
|
||||||
|
@ -163,6 +164,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
destination.setSlowConsumerStrategy(scs);
|
destination.setSlowConsumerStrategy(scs);
|
||||||
destination.setPrioritizedMessages(isPrioritizedMessages());
|
destination.setPrioritizedMessages(isPrioritizedMessages());
|
||||||
destination.setGcIfInactive(isGcInactiveDestinations());
|
destination.setGcIfInactive(isGcInactiveDestinations());
|
||||||
|
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
|
||||||
destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
|
destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
|
||||||
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
|
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
|
||||||
}
|
}
|
||||||
|
@ -788,6 +790,14 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
|
this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
|
||||||
|
this.gcWithNetworkConsumers = gcWithNetworkConsumers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isGcWithNetworkConsumers() {
|
||||||
|
return gcWithNetworkConsumers;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isReduceMemoryFootprint() {
|
public boolean isReduceMemoryFootprint() {
|
||||||
return reduceMemoryFootprint;
|
return reduceMemoryFootprint;
|
||||||
}
|
}
|
||||||
|
|
|
@ -264,6 +264,7 @@ public class RequestReplyNoAdvisoryNetworkTest extends JmsMultipleBrokersTestSup
|
||||||
PolicyEntry tempReplyQPolicy = new PolicyEntry();
|
PolicyEntry tempReplyQPolicy = new PolicyEntry();
|
||||||
tempReplyQPolicy.setOptimizedDispatch(true);
|
tempReplyQPolicy.setOptimizedDispatch(true);
|
||||||
tempReplyQPolicy.setGcInactiveDestinations(true);
|
tempReplyQPolicy.setGcInactiveDestinations(true);
|
||||||
|
tempReplyQPolicy.setGcWithNetworkConsumers(true);
|
||||||
tempReplyQPolicy.setInactiveTimoutBeforeGC(10*1000);
|
tempReplyQPolicy.setInactiveTimoutBeforeGC(10*1000);
|
||||||
map.put(replyQWildcard, tempReplyQPolicy);
|
map.put(replyQWildcard, tempReplyQPolicy);
|
||||||
broker.setDestinationPolicy(map);
|
broker.setDestinationPolicy(map);
|
||||||
|
|
Loading…
Reference in New Issue