diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 85a873be49..6b797ac361 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -51,6 +51,7 @@ public abstract class BaseDestination implements Destination { protected SystemUsage systemUsage; protected MemoryUsage memoryUsage; private boolean producerFlowControl = false; + protected boolean warnOnProducerFlowControl = true; private int maxProducersToAudit = 1024; private int maxAuditDepth = 2048; private boolean enableAudit = true; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 2d0d1de0f3..5f1832fa32 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -412,11 +412,15 @@ public class Queue extends BaseDestination implements Task, UsageListener { isFull(context, memoryUsage); fastProducer(context, producerInfo); if (isProducerFlowControl() && context.isProducerFlowControl()) { - final String logMessage = "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + - " See http://activemq.apache.org/producer-flow-control.html for more info"; - LOG.info(logMessage); + if(warnOnProducerFlowControl) { + warnOnProducerFlowControl = false; + LOG.info("Usage Manager memory limit reached on " +getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); + } + if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException("SystemUsage memory limit reached"); + throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); } // We can avoid blocking due to low usage if the producer is sending diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 4ec65e135e..d24bffc5c3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -280,14 +280,20 @@ public class Topic extends BaseDestination implements Task{ if(memoryUsage.isFull()) { isFull(context, memoryUsage); fastProducer(context, producerInfo); + if (isProducerFlowControl() && context.isProducerFlowControl()) { - final String logMessage = "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." + - " See http://activemq.apache.org/producer-flow-control.html for more info"; - LOG.info(logMessage); - if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException(logMessage); + + if(warnOnProducerFlowControl) { + warnOnProducerFlowControl = false; + LOG.info("Usage Manager memory limit reached for " +getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - + + if (systemUsage.isSendFailIfNoSpace()) { + throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); + } + // We can avoid blocking due to low usage if the producer is sending // a sync message or // if it is using a producer window