https://issues.apache.org/jira/browse/AMQ-2911: Option to make all consumers retroactive. Patch applied and with thanks and an additional test. new destination policy entry 'alwaysRetroactive'.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1145217 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-07-11 16:02:52 +00:00
parent 5967d76c85
commit fe63c29bcc
9 changed files with 60 additions and 3 deletions

View File

@ -339,6 +339,14 @@ public class DestinationView implements DestinationViewMBean {
public void setProducerFlowControl(boolean producerFlowControl) {
destination.setProducerFlowControl(producerFlowControl);
}
public boolean isAlwaysRetroactive() {
return destination.isAlwaysRetroactive();
}
public void setAlwaysRetroactive(boolean alwaysRetroactive) {
destination.setAlwaysRetroactive(alwaysRetroactive);
}
/**
* Set's the interval at which warnings about producers being blocked by

View File

@ -253,6 +253,17 @@ public interface DestinationViewMBean {
*/
public void setProducerFlowControl(@MBeanInfo("producerFlowControl") boolean producerFlowControl);
/**
* @return if we treat consumers as alwaysRetroactive
*/
@MBeanInfo("Always treat consumers as retroActive")
boolean isAlwaysRetroactive();
/**
* @param alwaysRetroactive set as always retroActive
*/
public void setAlwaysRetroactive(@MBeanInfo("alwaysRetroactive") boolean alwaysRetroactive);
/**
* Set's the interval at which warnings about producers being blocked by
* resource usage will be triggered. Values of 0 or less will disable

View File

@ -63,6 +63,7 @@ public abstract class BaseDestination implements Destination {
protected SystemUsage systemUsage;
protected MemoryUsage memoryUsage;
private boolean producerFlowControl = true;
private boolean alwaysRetroactive = false;
protected boolean warnOnProducerFlowControl = true;
protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
@ -147,6 +148,14 @@ public abstract class BaseDestination implements Destination {
public void setProducerFlowControl(boolean producerFlowControl) {
this.producerFlowControl = producerFlowControl;
}
public boolean isAlwaysRetroactive() {
return alwaysRetroactive;
}
public void setAlwaysRetroactive(boolean alwaysRetroactive) {
this.alwaysRetroactive = alwaysRetroactive;
}
/**
* Set's the interval at which warnings about producers being blocked by

View File

@ -77,6 +77,10 @@ public interface Destination extends Service, Task {
boolean isProducerFlowControl();
void setProducerFlowControl(boolean value);
boolean isAlwaysRetroactive();
void setAlwaysRetroactive(boolean value);
/**
* Set's the interval at which warnings about producers being blocked by

View File

@ -137,6 +137,14 @@ public class DestinationFilter implements Destination {
public void setProducerFlowControl(boolean value) {
next.setProducerFlowControl(value);
}
public boolean isAlwaysRetroactive() {
return next.isAlwaysRetroactive();
}
public void setAlwaysRetroactive(boolean value) {
next.setAlwaysRetroactive(value);
}
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);

View File

@ -117,7 +117,7 @@ public class Topic extends BaseDestination implements Task {
if (!sub.getConsumerInfo().isDurable()) {
// Do a retroactive recovery if needed.
if (sub.getConsumerInfo().isRetroactive()) {
if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
// synchronize with dispatch method so that no new messages are
// sent

View File

@ -61,6 +61,7 @@ public class PolicyEntry extends DestinationMapEntry {
private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
private boolean enableAudit=true;
private boolean producerFlowControl = true;
private boolean alwaysRetroactive = false;
private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
private boolean optimizedDispatch=false;
private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
@ -140,6 +141,7 @@ public class PolicyEntry extends DestinationMapEntry {
public void baseConfiguration(Broker broker,BaseDestination destination) {
destination.setProducerFlowControl(isProducerFlowControl());
destination.setAlwaysRetroactive(isAlwaysRetroactive());
destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
destination.setEnableAudit(isEnableAudit());
destination.setMaxAuditDepth(getMaxQueueAuditDepth());
@ -411,6 +413,21 @@ public class PolicyEntry extends DestinationMapEntry {
this.producerFlowControl = producerFlowControl;
}
/**
* @return true if topic is always retroactive
*/
public boolean isAlwaysRetroactive() {
return alwaysRetroactive;
}
/**
* @param alwaysRetroactive
*/
public void setAlwaysRetroactive(boolean alwaysRetroactive) {
this.alwaysRetroactive = alwaysRetroactive;
}
/**
* Set's the interval at which warnings about producers being blocked by
* resource usage will be triggered. Values of 0 or less will disable

View File

@ -88,7 +88,7 @@ public class RetroactiveConsumerWithMessageQueryTest extends EmbeddedBrokerTestS
protected ConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(bindAddress);
answer.setUseRetroactiveConsumer(true);
//answer.setUseRetroactiveConsumer(true);
return answer;
}

View File

@ -32,7 +32,7 @@
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="org.apache.activemq.test.>">
<policyEntry topic="org.apache.activemq.test.>" alwaysRetroactive="true" >
<subscriptionRecoveryPolicy>
<queryBasedSubscriptionRecoveryPolicy query="#myQuery" />
</subscriptionRecoveryPolicy>