From 060817552de9b8f2f1c66a3146f74c51233f943c Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 2 Mar 2018 12:17:24 +0000 Subject: [PATCH] [AMQ-4261] use WARN level for producer flow control events, DEBUG if blockedProducerWarningInterval=0 --- .../activemq/broker/region/BaseDestination.java | 4 +++- .../org/apache/activemq/broker/region/Queue.java | 14 ++++++++++---- .../org/apache/activemq/broker/region/Topic.java | 5 ++++- .../apache/activemq/ProducerFlowControlTest.java | 13 +++++++++++-- .../java/org/apache/activemq/bugs/AMQ6463Test.java | 2 +- .../usecases/TopicProducerFlowControlTest.java | 2 +- 6 files changed, 30 insertions(+), 10 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 672c5b947c..9681fbe4e1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -699,7 +699,9 @@ public abstract class BaseDestination implements Destination { } if (isFlowControlLogRequired()) { - getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))}); + getLog().warn("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))}); + } else { + getLog().debug("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))}); } } long finish = System.currentTimeMillis(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 04488a26d0..ff55e2f546 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -637,9 +637,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index fastProducer(context, producerInfo); if (isProducerFlowControl() && context.isProducerFlowControl()) { if (isFlowControlLogRequired()) { - LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", + LOG.warn("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount()); - + } else { + LOG.debug("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", + memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount()); } if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { ResourceAllocationException resourceAllocationException = sendMemAllocationException; @@ -2083,8 +2085,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } finally { pagedInMessagesLock.writeLock().unlock(); } - } else if (!messages.hasSpace() && isFlowControlLogRequired()) { - LOG.warn("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage()); + } else if (!messages.hasSpace()) { + if (isFlowControlLogRequired()) { + LOG.warn("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage()); + } else { + LOG.debug("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage()); + } } } else { // Avoid return null list, if condition is not validated diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 8b46475004..ff0406ed3f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -383,7 +383,10 @@ public class Topic extends BaseDestination implements Task { if (isProducerFlowControl() && context.isProducerFlowControl()) { if (isFlowControlLogRequired()) { - LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", + LOG.warn("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", + getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); + } else { + LOG.debug("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java index 7cf05d34bc..202d99e00d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java @@ -257,18 +257,26 @@ public class ProducerFlowControlTest extends JmsTestSupport { public void testDisableWarning() throws Exception { final AtomicInteger warnings = new AtomicInteger(); + final AtomicInteger debugs = new AtomicInteger(); + Appender appender = new DefaultTestAppender() { @Override public void doAppend(LoggingEvent event) { - if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Usage Manager Memory Limit")) { - LOG.info("received log message: " + event.getMessage()); + if (event.getLevel().equals(Level.WARN) && event.getMessage().toString().contains("Usage Manager Memory Limit")) { + LOG.info("received warn log message: " + event.getMessage()); warnings.incrementAndGet(); } + if (event.getLevel().equals(Level.DEBUG) && event.getMessage().toString().contains("Usage Manager Memory Limit")) { + LOG.info("received debug log message: " + event.getMessage()); + debugs.incrementAndGet(); + } + } }; org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(Queue.class); log4jLogger.addAppender(appender); + log4jLogger.setLevel(Level.DEBUG); try { ConnectionFactory factory = createConnectionFactory(); connection = (ActiveMQConnection)factory.createConnection(); @@ -287,6 +295,7 @@ public class ProducerFlowControlTest extends JmsTestSupport { connection.start(); fillQueue(new ActiveMQQueue("SomeOtherQueueToPickUpNewPolicy")); assertEquals(0, warnings.get()); + assertTrue(debugs.get() > 1); } finally { log4jLogger.removeAppender(appender); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java index 96332442bb..e73bd7c086 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6463Test.java @@ -121,7 +121,7 @@ public class AMQ6463Test extends JmsTestSupport { public void doAppend(LoggingEvent event) { if (event.getLevel().equals(Level.ERROR)) { errors.incrementAndGet(); - } else if (event.getLevel().equals(Level.INFO) && event.getRenderedMessage().contains("Usage Manager Memory Limit")) { + } else if (event.getLevel().equals(Level.WARN) && event.getRenderedMessage().contains("Usage Manager Memory Limit")) { gotUsageBlocked.set(true); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java index 1574ec963b..76f54ff517 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java @@ -138,7 +138,7 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis Appender appender = new DefaultTestAppender() { @Override public void doAppend(LoggingEvent event) { - if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Usage Manager memory limit reached")) { + if (event.getLevel().equals(Level.WARN) && event.getMessage().toString().contains("Usage Manager memory limit reached")) { LOG.info("received log message: " + event.getMessage()); warnings.incrementAndGet(); }