https://issues.apache.org/jira/browse/AMQ-4068 fix intermittent test failure. Rework usage check to prevent additions to the store rather than blocking scheduled dispatch from the store

This commit is contained in:
gtully 2015-05-08 13:30:33 +01:00
parent 3bfffca9c9
commit 1359e8eae2
2 changed files with 42 additions and 35 deletions

View File

@ -155,6 +155,32 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
} else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
// Check for room in the job scheduler store
if (systemUsage.getJobSchedulerUsage() != null) {
JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
if (usage.isFull()) {
final String logMessage = "Job Scheduler Store is Full (" +
usage.getPercentUsage() + "% of " + usage.getLimit() +
"). Stopping producer (" + messageSend.getProducerId() +
") to prevent flooding of the job scheduler store." +
" See http://activemq.apache.org/producer-flow-control.html for more info";
long start = System.currentTimeMillis();
long nextWarn = start;
while (!usage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
long now = System.currentTimeMillis();
if (now >= nextWarn) {
LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
nextWarn = now + 30000l;
}
}
}
}
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
@Override
@ -212,32 +238,6 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
}
// Check for room in the job scheduler store
if (systemUsage.getJobSchedulerUsage() != null) {
JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
if (usage.isFull()) {
final String logMessage = "Job Scheduler Store is Full (" +
usage.getPercentUsage() + "% of " + usage.getLimit() +
"). Stopping producer (" + messageSend.getProducerId() +
") to prevent flooding of the job scheduler store." +
" See http://activemq.apache.org/producer-flow-control.html for more info";
long start = System.currentTimeMillis();
long nextWarn = start;
while (!usage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
long now = System.currentTimeMillis();
if (now >= nextWarn) {
LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
nextWarn = now + 30000l;
}
}
}
}
if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
// create a unique id - the original message could be sent
// lots of times

View File

@ -34,6 +34,8 @@ import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertNotEquals;
public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreUsageTest.class);
@ -60,7 +62,7 @@ public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
return true;
}
public void testJmx() throws Exception {
public void testBlockAndChangeViaJmxReleases() throws Exception {
LOG.info("Initial scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
@ -82,25 +84,30 @@ public class JobSchedulerStoreUsageTest extends EmbeddedBrokerTestSupport {
assertEquals(7 * 1024, broker.getAdminView().getJobSchedulerStoreLimit());
// wait for the producer to block
Thread.sleep(WAIT_TIME_MILLS / 2);
assertTrue("Usage exhausted", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" + producer.getSentCount());
return broker.getAdminView().getJobSchedulerStorePercentUsage() > 100;
}
}));
assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() > 100);
LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" + producer.getSentCount());
assertNotEquals("Producer has not sent all messages", producer.getMessageCount(), producer.getSentCount());
broker.getAdminView().setJobSchedulerStoreLimit(1024 * 1024 * 33);
Thread.sleep(WAIT_TIME_MILLS);
LOG.info("scheduler store usage %" + broker.getAdminView().getJobSchedulerStorePercentUsage() + " producerSent count:" + producer.getSentCount());
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return producer.getSentCount() == producer.getMessageCount();
}
}, WAIT_TIME_MILLS * 2);
});
assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount());
LOG.info("Final scheduler usage: {}", broker.getAdminView().getJobSchedulerStorePercentUsage());
assertEquals("Producer sent all messages", producer.getMessageCount(), producer.getSentCount());
assertTrue(broker.getAdminView().getJobSchedulerStorePercentUsage() < 100);
}