From 1c121bcded2c426c3e424c797069cda3042bd9d6 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Thu, 24 Sep 2009 16:32:07 +0000 Subject: [PATCH] merging revisions 817222:817842 git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@818540 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/BaseDestination.java | 1 + .../apache/activemq/broker/region/Queue.java | 26 ++++++++++++++++--- .../apache/activemq/broker/region/Topic.java | 24 +++++++++++++---- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 5501fd1f06..a5cfda3953 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -51,6 +51,7 @@ public abstract class BaseDestination implements Destination { protected SystemUsage systemUsage; protected MemoryUsage memoryUsage; private boolean producerFlowControl = true; + protected boolean warnOnProducerFlowControl = true; private int maxProducersToAudit = 1024; private int maxAuditDepth = 2048; private boolean enableAudit = true; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index f51959aab1..f9be9f7e48 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -413,8 +413,15 @@ public class Queue extends BaseDestination implements Task, UsageListener { isFull(context, memoryUsage); fastProducer(context, producerInfo); if (isProducerFlowControl() && context.isProducerFlowControl()) { + if(warnOnProducerFlowControl) { + warnOnProducerFlowControl = false; + LOG.info("Usage Manager memory limit reached on " +getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); + } + if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException("SystemUsage memory limit reached"); + throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); } // We can avoid blocking due to low usage if the producer is sending @@ -498,8 +505,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { final ConnectionContext context = producerExchange.getConnectionContext(); synchronized (sendLock) { if (store != null && message.isPersistent()) { - if (systemUsage.isSendFailIfNoSpace() && systemUsage.getStoreUsage().isFull()) { - throw new javax.jms.ResourceAllocationException("Usage Manager Store is Full"); + if (systemUsage.getStoreUsage().isFull()) { + final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"; + LOG.info(logMessage); + if (systemUsage.isSendFailIfNoSpace()) { + throw new javax.jms.ResourceAllocationException(logMessage); + } } while (!systemUsage.getStoreUsage().waitForSpace(1000)) { if (context.getStopping().get()) { @@ -1279,6 +1291,14 @@ public class Queue extends BaseDestination implements Task, UsageListener { final void sendMessage(final ConnectionContext context, Message msg) throws Exception { if (!msg.isPersistent() && messages.getSystemUsage() != null) { + if (systemUsage.getTempUsage().isFull()) { + final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"; + LOG.info(logMessage); + if (systemUsage.isSendFailIfNoSpace()) { + throw new javax.jms.ResourceAllocationException(logMessage); + } + } messages.getSystemUsage().getTempUsage().waitForSpace(); } synchronized(messages) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index c1035ddcf8..d24bffc5c3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -280,11 +280,20 @@ public class Topic extends BaseDestination implements Task{ if(memoryUsage.isFull()) { isFull(context, memoryUsage); fastProducer(context, producerInfo); + if (isProducerFlowControl() && context.isProducerFlowControl()) { - if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); + + if(warnOnProducerFlowControl) { + warnOnProducerFlowControl = false; + LOG.info("Usage Manager memory limit reached for " +getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - + + if (systemUsage.isSendFailIfNoSpace()) { + throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); + } + // We can avoid blocking due to low usage if the producer is sending // a sync message or // if it is using a producer window @@ -390,8 +399,13 @@ public class Topic extends BaseDestination implements Task{ if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { - if (systemUsage.isSendFailIfNoSpace() && systemUsage.getStoreUsage().isFull()) { - throw new javax.jms.ResourceAllocationException("Usage Manager Store is Full"); + if (systemUsage.getStoreUsage().isFull()) { + final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"; + LOG.info(logMessage); + if (systemUsage.isSendFailIfNoSpace()) { + throw new javax.jms.ResourceAllocationException(logMessage); + } } while (!systemUsage.getStoreUsage().waitForSpace(1000)) { if (context.getStopping().get()) {