AMQ-6967 - ensure there are some messages paged in for periodic expiry check if non are in memory

(cherry picked from commit 026c6f4403)
This commit is contained in:
gtully 2018-05-21 14:26:45 +01:00 committed by Christopher L. Shannon (cshannon)
parent e3f76e1693
commit cdae25ecfb
2 changed files with 36 additions and 2 deletions

View File

@ -1207,9 +1207,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()});
return (alreadyPagedIn < max)
return (alreadyPagedIn == 0 || (alreadyPagedIn < max)
&& (alreadyPagedIn < messagesInQueue)
&& messages.hasSpace();
&& messages.hasSpace());
}
private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int max,

View File

@ -31,6 +31,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
@ -44,8 +45,11 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JDBCPersistenceAdapterExpiredMessageTest {
private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapterExpiredMessageTest.class);
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
@ -110,6 +114,7 @@ public class JDBCPersistenceAdapterExpiredMessageTest {
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(5000);
defaultEntry.setMaxExpirePageSize(expireSize);
defaultEntry.setMemoryLimit(100*16*1024);
policyMap.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(policyMap);
@ -154,4 +159,33 @@ public class JDBCPersistenceAdapterExpiredMessageTest {
}
}, 15000, 1000));
}
@Test
public void testExpiredAfterCacheExhausted() throws Exception {
final ActiveMQQueue queue = new ActiveMQQueue("test.q");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.setWatchTopicAdvisories(false);
Connection conn = factory.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sess.createProducer(queue);
producer.setTimeToLive(1000);
String payLoad = new String(new byte[16*1024]);
final int numMessages = 500;
for (int i = 0; i < numMessages; i++) {
producer.send(sess.createTextMessage("test message: " + payLoad));
}
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
long expired = brokerService.getDestination(queue).getDestinationStatistics().getExpired().getCount();
LOG.info("Expired: " + expired);
return expired == numMessages;
}
}, 15000, 1000));
}
}