From 1359e8eae2456449cd37f4edc53afce20102ab03 Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 8 May 2015 13:30:33 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4068 fix intermittent test failure. Rework usage check to prevent additions to the store rather than blocking scheduled dispatch from the store --- .../broker/scheduler/SchedulerBroker.java | 52 +++++++++---------- .../usage/JobSchedulerStoreUsageTest.java | 25 +++++---- 2 files changed, 42 insertions(+), 35 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java index 6cd476f629..70a981612f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java @@ -155,6 +155,32 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) { + // Check for room in the job scheduler store + if (systemUsage.getJobSchedulerUsage() != null) { + JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage(); + if (usage.isFull()) { + final String logMessage = "Job Scheduler Store is Full (" + + usage.getPercentUsage() + "% of " + usage.getLimit() + + "). Stopping producer (" + messageSend.getProducerId() + + ") to prevent flooding of the job scheduler store." + + " See http://activemq.apache.org/producer-flow-control.html for more info"; + + long start = System.currentTimeMillis(); + long nextWarn = start; + while (!usage.waitForSpace(1000)) { + if (context.getStopping().get()) { + throw new IOException("Connection closed, send aborted."); + } + + long now = System.currentTimeMillis(); + if (now >= nextWarn) { + LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)"); + nextWarn = now + 30000l; + } + } + } + } + if (context.isInTransaction()) { context.getTransaction().addSynchronization(new Synchronization() { @Override @@ -212,32 +238,6 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class); } - // Check for room in the job scheduler store - if (systemUsage.getJobSchedulerUsage() != null) { - JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage(); - if (usage.isFull()) { - final String logMessage = "Job Scheduler Store is Full (" + - usage.getPercentUsage() + "% of " + usage.getLimit() + - "). Stopping producer (" + messageSend.getProducerId() + - ") to prevent flooding of the job scheduler store." + - " See http://activemq.apache.org/producer-flow-control.html for more info"; - - long start = System.currentTimeMillis(); - long nextWarn = start; - while (!usage.waitForSpace(1000)) { - if (context.getStopping().get()) { - throw new IOException("Connection closed, send aborted."); - } - - long now = System.currentTimeMillis(); - if (now >= nextWarn) { - LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)"); - nextWarn = now + 30000l; - } - } - } - } - if (repeat != 0 || cronStr != null && cronStr.length() > 0) { // create a unique id - the original message could be sent // lots of times diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java index f9697d975a..057c5df0ac 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/JobSchedulerStoreUsageTest.java @@ -34,6 +34,8 @@ import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertNotEquals; + public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport { private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreUsageTest.class); @@ -60,7 +62,7 @@ public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport { return true; } - public void testJmx() throws Exception { + public void testBlockAndChangeViaJmxReleases() throws Exception { LOG.info("Initial scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage()); @@ -82,25 +84,30 @@ public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport { assertEquals(7 * 1024, broker.getAdminView().getJobSchedulerStoreLimit()); - // wait for the producer to block - Thread.sleep(WAIT_TIME_MILLS / 2); + assertTrue("Usage exhausted", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" + producer.getSentCount()); + return broker.getAdminView().getJobSchedulerStorePercentUsage() > 100; + } + })); - assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() > 100); + LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" + producer.getSentCount()); + + assertNotEquals("Producer has not sent all messages", producer.getMessageCount(), producer.getSentCount()); broker.getAdminView().setJobSchedulerStoreLimit(1024 * 1024 * 33); - Thread.sleep(WAIT_TIME_MILLS); + LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" + producer.getSentCount()); Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { return producer.getSentCount() == producer.getMessageCount(); } - }, WAIT_TIME_MILLS * 2); + }); - assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount()); - - LOG.info("Final scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage()); + assertEquals("Producer sent all messages", producer.getMessageCount(), producer.getSentCount()); assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() < 100); }