diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index b6b49d8fd3..4467574ab4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -412,6 +412,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { isFull(context, memoryUsage); fastProducer(context, producerInfo); if (isProducerFlowControl() && context.isProducerFlowControl()) { + final String logMessage = "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"; + LOG.info(logMessage); if (systemUsage.isSendFailIfNoSpace()) { throw new javax.jms.ResourceAllocationException("SystemUsage memory limit reached"); } @@ -497,9 +500,14 @@ public class Queue extends BaseDestination implements Task, UsageListener { final ConnectionContext context = producerExchange.getConnectionContext(); synchronized (sendLock) { if (store != null && message.isPersistent()) { - if (systemUsage.isSendFailIfNoSpace() && systemUsage.getStoreUsage().isFull()) { - throw new javax.jms.ResourceAllocationException("Usage Manager Store is Full"); - } + if (systemUsage.getStoreUsage().isFull()) { + final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"; + LOG.info(logMessage); + if (systemUsage.isSendFailIfNoSpace()) { + throw new javax.jms.ResourceAllocationException(logMessage); + } + } while (!systemUsage.getStoreUsage().waitForSpace(1000)) { if (context.getStopping().get()) { throw new IOException( @@ -1278,6 +1286,14 @@ public class Queue extends BaseDestination implements Task, UsageListener { final void sendMessage(final ConnectionContext context, Message msg) throws Exception { if (!msg.isPersistent() && messages.getSystemUsage() != null) { + if (systemUsage.getTempUsage().isFull()) { + final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"; + LOG.info(logMessage); + if (systemUsage.isSendFailIfNoSpace()) { + throw new javax.jms.ResourceAllocationException(logMessage); + } + } messages.getSystemUsage().getTempUsage().waitForSpace(); } synchronized(messages) {