[AMQ-3233] respect policy entry blockedProducerWarningInterval for flow control warning, 0 disables and Xmillis makes it periodic, default period of 30s is not unlike the existing once behaviour. fix and tests

This commit is contained in:
gtully 2017-03-03 11:23:23 +00:00
parent 2809befff5
commit 01b1f7f694
5 changed files with 133 additions and 49 deletions

View File

@ -68,7 +68,7 @@ public abstract class BaseDestination implements Destination {
protected MemoryUsage memoryUsage;
private boolean producerFlowControl = true;
private boolean alwaysRetroactive = false;
protected boolean warnOnProducerFlowControl = true;
protected long lastBlockedProducerWarnTime = 0l;
protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
private int maxProducersToAudit = 1024;
@ -683,7 +683,6 @@ public abstract class BaseDestination implements Destination {
}
} else {
long start = System.currentTimeMillis();
long nextWarn = start;
producerBrokerExchange.blockingOnFlowControl(true);
destinationStatistics.getBlockedSends().increment();
while (!usage.waitForSpace(1000, highWaterMark)) {
@ -691,10 +690,8 @@ public abstract class BaseDestination implements Destination {
throw new IOException("Connection closed, send aborted.");
}
long now = System.currentTimeMillis();
if (now >= nextWarn) {
getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((now - start) / 1000))});
nextWarn = now + blockedProducerWarningInterval;
if (isFlowControlLogRequired()) {
getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))});
}
}
long finish = System.currentTimeMillis();
@ -705,6 +702,18 @@ public abstract class BaseDestination implements Destination {
}
}
protected boolean isFlowControlLogRequired() {
boolean answer = false;
if (blockedProducerWarningInterval > 0) {
long now = System.currentTimeMillis();
if (lastBlockedProducerWarnTime + blockedProducerWarningInterval <= now) {
lastBlockedProducerWarnTime = now;
answer = true;
}
}
return answer;
}
protected abstract Logger getLog();
public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {

View File

@ -628,12 +628,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
isFull(context, memoryUsage);
fastProducer(context, producerInfo);
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
if (isFlowControlLogRequired()) {
LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
}
memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
}
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
+ message.getProducerId() + ") to prevent flooding "

View File

@ -382,8 +382,7 @@ public class Topic extends BaseDestination implements Task {
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
if (isFlowControlLogRequired()) {
LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
}

View File

@ -20,7 +20,9 @@ import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
@ -31,12 +33,18 @@ import javax.jms.TextMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -247,6 +255,44 @@ public class ProducerFlowControlTest extends JmsTestSupport {
assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
}
public void testDisableWarning() throws Exception {
final AtomicInteger warnings = new AtomicInteger();
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Usage Manager Memory Limit")) {
LOG.info("received log message: " + event.getMessage());
warnings.incrementAndGet();
}
}
};
org.apache.log4j.Logger log4jLogger =
org.apache.log4j.Logger.getLogger(Queue.class);
log4jLogger.addAppender(appender);
try {
ConnectionFactory factory = createConnectionFactory();
connection = (ActiveMQConnection)factory.createConnection();
connections.add(connection);
connection.start();
fillQueue(queueB);
assertEquals(1, warnings.get());
broker.getDestinationPolicy().getDefaultEntry().setBlockedProducerWarningInterval(0);
warnings.set(0);
// new connection b/c other is blocked
connection = (ActiveMQConnection)factory.createConnection();
connections.add(connection);
connection.start();
fillQueue(new ActiveMQQueue("SomeOtherQueueToPickUpNewPolicy"));
assertEquals(0, warnings.get());
} finally {
log4jLogger.removeAppender(appender);
}
}
private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
final AtomicBoolean done = new AtomicBoolean(true);
final AtomicBoolean keepGoing = new AtomicBoolean(true);
@ -334,7 +380,9 @@ public class ProducerFlowControlTest extends JmsTestSupport {
}
protected void tearDown() throws Exception {
if (connection != null) {
for (Connection c : connections) {
// force error on blocked connections
ActiveMQConnection connection = (ActiveMQConnection) c;
TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class);
t.getTransportListener().onException(new IOException("Disposed."));
connection.getTransport().stop();

View File

@ -33,10 +33,15 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,6 +75,7 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
tpe.setMemoryLimit(destinationMemLimit);
tpe.setProducerFlowControl(true);
tpe.setAdvisoryWhenFull(true);
tpe.setBlockedProducerWarningInterval(2000);
pm.setPolicyEntries(Arrays.asList(new PolicyEntry[]{tpe}));
@ -128,49 +134,72 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
}
});
// Start producing the test messages
final Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(destination);
Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
for (long i = 0; i < numMessagesToSend; i++) {
producer.send(session.createTextMessage("test"));
long count = produced.incrementAndGet();
if (count % 10000 == 0) {
LOG.info("Produced " + count + " messages");
}
}
} catch (Throwable ex) {
ex.printStackTrace();
} finally {
try {
producer.close();
session.close();
} catch (Exception e) {
}
final AtomicInteger warnings = new AtomicInteger();
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel().equals(Level.INFO) && event.getMessage().toString().contains("Usage Manager memory limit reached")) {
LOG.info("received log message: " + event.getMessage());
warnings.incrementAndGet();
}
}
};
org.apache.log4j.Logger log4jLogger =
org.apache.log4j.Logger.getLogger(Topic.class);
log4jLogger.addAppender(appender);
try {
producingThread.start();
// Start producing the test messages
final Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(destination);
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return consumed.get() == numMessagesToSend;
}
}, 5 * 60 * 1000); // give it plenty of time before failing
Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
for (long i = 0; i < numMessagesToSend; i++) {
producer.send(session.createTextMessage("test"));
assertEquals("Didn't produce all messages", numMessagesToSend, produced.get());
assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get());
long count = produced.incrementAndGet();
if (count % 10000 == 0) {
LOG.info("Produced " + count + " messages");
}
}
} catch (Throwable ex) {
ex.printStackTrace();
} finally {
try {
producer.close();
session.close();
} catch (Exception e) {
}
}
}
};
assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return blockedCounter.get() > 0;
}
}, 5 * 1000));
producingThread.start();
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return consumed.get() == numMessagesToSend;
}
}, 5 * 60 * 1000); // give it plenty of time before failing
assertEquals("Didn't produce all messages", numMessagesToSend, produced.get());
assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get());
assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return blockedCounter.get() > 0;
}
}, 5 * 1000));
LOG.info("BlockedCount: " + blockedCounter.get() + ", Warnings:" + warnings.get());
assertTrue("got a few warnings", warnings.get() > 1);
assertTrue("warning limited", warnings.get() < blockedCounter.get());
} finally {
log4jLogger.removeAppender(appender);
}
}
protected Destination createDestination(Session listenerSession) throws Exception {