mirror of https://github.com/apache/activemq.git
add property optimizeMessageStoreInFlightLimit on destinations so can set the inflight limit above which optimize store dispatch is no longer applied - for https://issues.apache.org/jira/browse/AMQ-3750
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1302887 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f32792a9fc
commit
ca83305faa
|
@ -99,6 +99,10 @@ public abstract class BaseDestination implements Destination {
|
|||
protected final Scheduler scheduler;
|
||||
private boolean disposed = false;
|
||||
private boolean doOptimzeMessageStorage = true;
|
||||
/*
|
||||
* percentage of in-flight messages above which optimize message store is disabled
|
||||
*/
|
||||
private int optimizeMessageStoreInFlightLimit = 10;
|
||||
|
||||
/**
|
||||
* @param brokerService
|
||||
|
@ -723,6 +727,14 @@ public abstract class BaseDestination implements Destination {
|
|||
this.doOptimzeMessageStorage = doOptimzeMessageStorage;
|
||||
}
|
||||
|
||||
public int getOptimizeMessageStoreInFlightLimit() {
|
||||
return optimizeMessageStoreInFlightLimit;
|
||||
}
|
||||
|
||||
public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
|
||||
this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
|
||||
}
|
||||
|
||||
|
||||
public abstract List<Subscription> getConsumers();
|
||||
|
||||
|
|
|
@ -2164,7 +2164,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
result = false;
|
||||
break;
|
||||
}
|
||||
if (s.getInFlightUsage() > 10){
|
||||
if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
|
||||
result = false;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -707,7 +707,7 @@ public class Topic extends BaseDestination implements Task {
|
|||
result = false;
|
||||
break;
|
||||
}
|
||||
if (s.getInFlightUsage() > 10){
|
||||
if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
|
||||
result = false;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -97,6 +97,10 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
private boolean reduceMemoryFootprint;
|
||||
private NetworkBridgeFilterFactory networkBridgeFilterFactory;
|
||||
private boolean doOptimzeMessageStorage = true;
|
||||
/*
|
||||
* percentage of in-flight messages above which optimize message store is disabled
|
||||
*/
|
||||
private int optimizeMessageStoreInFlightLimit = 10;
|
||||
|
||||
|
||||
public void configure(Broker broker,Queue queue) {
|
||||
|
@ -173,6 +177,8 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
|
||||
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
|
||||
destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
|
||||
destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
|
||||
|
||||
}
|
||||
|
||||
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
|
||||
|
@ -842,4 +848,12 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
|
||||
this.doOptimzeMessageStorage = doOptimzeMessageStorage;
|
||||
}
|
||||
|
||||
public int getOptimizeMessageStoreInFlightLimit() {
|
||||
return optimizeMessageStoreInFlightLimit;
|
||||
}
|
||||
|
||||
public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
|
||||
this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue