diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 21015233cd..183ecd35a7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1238,9 +1238,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()}); - return (alreadyPagedIn < max) + return (alreadyPagedIn == 0 || (alreadyPagedIn < max) && (alreadyPagedIn < messagesInQueue) - && messages.hasSpace(); + && messages.hasSpace()); } private void addAll(Collection refs, List l, int max, diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java index e8e819c8f4..29fae9571e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java @@ -31,6 +31,7 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; @@ -44,8 +45,11 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JDBCPersistenceAdapterExpiredMessageTest { + private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapterExpiredMessageTest.class); @Rule public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); @@ -110,6 +114,7 @@ public class JDBCPersistenceAdapterExpiredMessageTest { PolicyEntry defaultEntry = new PolicyEntry(); defaultEntry.setExpireMessagesPeriod(5000); defaultEntry.setMaxExpirePageSize(expireSize); + defaultEntry.setMemoryLimit(100*16*1024); policyMap.setDefaultEntry(defaultEntry); brokerService.setDestinationPolicy(policyMap); @@ -154,4 +159,33 @@ public class JDBCPersistenceAdapterExpiredMessageTest { } }, 15000, 1000)); } + + @Test + public void testExpiredAfterCacheExhausted() throws Exception { + final ActiveMQQueue queue = new ActiveMQQueue("test.q"); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.setWatchTopicAdvisories(false); + Connection conn = factory.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = sess.createProducer(queue); + producer.setTimeToLive(1000); + String payLoad = new String(new byte[16*1024]); + + final int numMessages = 500; + for (int i = 0; i < numMessages; i++) { + producer.send(sess.createTextMessage("test message: " + payLoad)); + } + + assertTrue(Wait.waitFor(new Condition() { + + @Override + public boolean isSatisified() throws Exception { + long expired = brokerService.getDestination(queue).getDestinationStatistics().getExpired().getCount(); + LOG.info("Expired: " + expired); + return expired == numMessages; + } + }, 15000, 1000)); + } }