[AMQ-4261] use WARN level for producer flow control events, DEBUG if blockedProducerWarningInterval=0

This commit is contained in:
gtully 2018-03-02 12:17:24 +00:00
parent b57f4f3211
commit 060817552d
6 changed files with 30 additions and 10 deletions

View File

@ -699,7 +699,9 @@ public abstract class BaseDestination implements Destination {
}
if (isFlowControlLogRequired()) {
getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))});
getLog().warn("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))});
} else {
getLog().debug("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))});
}
}
long finish = System.currentTimeMillis();

View File

@ -637,9 +637,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
fastProducer(context, producerInfo);
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (isFlowControlLogRequired()) {
LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
LOG.warn("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
} else {
LOG.debug("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
}
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
ResourceAllocationException resourceAllocationException = sendMemAllocationException;
@ -2083,8 +2085,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} finally {
pagedInMessagesLock.writeLock().unlock();
}
} else if (!messages.hasSpace() && isFlowControlLogRequired()) {
LOG.warn("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage());
} else if (!messages.hasSpace()) {
if (isFlowControlLogRequired()) {
LOG.warn("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage());
} else {
LOG.debug("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage());
}
}
} else {
// Avoid return null list, if condition is not validated

View File

@ -383,7 +383,10 @@ public class Topic extends BaseDestination implements Task {
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (isFlowControlLogRequired()) {
LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
LOG.warn("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
} else {
LOG.debug("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
}

View File

@ -257,18 +257,26 @@ public class ProducerFlowControlTest extends JmsTestSupport {
public void testDisableWarning() throws Exception {
final AtomicInteger warnings = new AtomicInteger();
final AtomicInteger debugs = new AtomicInteger();
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Usage Manager Memory Limit")) {
LOG.info("received log message: " + event.getMessage());
if (event.getLevel().equals(Level.WARN) && event.getMessage().toString().contains("Usage Manager Memory Limit")) {
LOG.info("received warn log message: " + event.getMessage());
warnings.incrementAndGet();
}
if (event.getLevel().equals(Level.DEBUG) && event.getMessage().toString().contains("Usage Manager Memory Limit")) {
LOG.info("received debug log message: " + event.getMessage());
debugs.incrementAndGet();
}
}
};
org.apache.log4j.Logger log4jLogger =
org.apache.log4j.Logger.getLogger(Queue.class);
log4jLogger.addAppender(appender);
log4jLogger.setLevel(Level.DEBUG);
try {
ConnectionFactory factory = createConnectionFactory();
connection = (ActiveMQConnection)factory.createConnection();
@ -287,6 +295,7 @@ public class ProducerFlowControlTest extends JmsTestSupport {
connection.start();
fillQueue(new ActiveMQQueue("SomeOtherQueueToPickUpNewPolicy"));
assertEquals(0, warnings.get());
assertTrue(debugs.get() > 1);
} finally {
log4jLogger.removeAppender(appender);

View File

@ -121,7 +121,7 @@ public class AMQ6463Test extends JmsTestSupport {
public void doAppend(LoggingEvent event) {
if (event.getLevel().equals(Level.ERROR)) {
errors.incrementAndGet();
} else if (event.getLevel().equals(Level.INFO) && event.getRenderedMessage().contains("Usage Manager Memory Limit")) {
} else if (event.getLevel().equals(Level.WARN) && event.getRenderedMessage().contains("Usage Manager Memory Limit")) {
gotUsageBlocked.set(true);
}
}

View File

@ -138,7 +138,7 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Usage Manager memory limit reached")) {
if (event.getLevel().equals(Level.WARN) && event.getMessage().toString().contains("Usage Manager memory limit reached")) {
LOG.info("received log message: " + event.getMessage());
warnings.incrementAndGet();
}