mirror of https://github.com/apache/activemq.git
AMQ-6967 - ensure there are some messages paged in for periodic expiry check if non are in memory
This commit is contained in:
parent
01384c714d
commit
026c6f4403
|
@ -1238,9 +1238,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()});
|
LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()});
|
||||||
return (alreadyPagedIn < max)
|
return (alreadyPagedIn == 0 || (alreadyPagedIn < max)
|
||||||
&& (alreadyPagedIn < messagesInQueue)
|
&& (alreadyPagedIn < messagesInQueue)
|
||||||
&& messages.hasSpace();
|
&& messages.hasSpace());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int max,
|
private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int max,
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
|
@ -44,8 +45,11 @@ import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class JDBCPersistenceAdapterExpiredMessageTest {
|
public class JDBCPersistenceAdapterExpiredMessageTest {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapterExpiredMessageTest.class);
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
|
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
|
||||||
|
@ -110,6 +114,7 @@ public class JDBCPersistenceAdapterExpiredMessageTest {
|
||||||
PolicyEntry defaultEntry = new PolicyEntry();
|
PolicyEntry defaultEntry = new PolicyEntry();
|
||||||
defaultEntry.setExpireMessagesPeriod(5000);
|
defaultEntry.setExpireMessagesPeriod(5000);
|
||||||
defaultEntry.setMaxExpirePageSize(expireSize);
|
defaultEntry.setMaxExpirePageSize(expireSize);
|
||||||
|
defaultEntry.setMemoryLimit(100*16*1024);
|
||||||
policyMap.setDefaultEntry(defaultEntry);
|
policyMap.setDefaultEntry(defaultEntry);
|
||||||
brokerService.setDestinationPolicy(policyMap);
|
brokerService.setDestinationPolicy(policyMap);
|
||||||
|
|
||||||
|
@ -154,4 +159,33 @@ public class JDBCPersistenceAdapterExpiredMessageTest {
|
||||||
}
|
}
|
||||||
}, 15000, 1000));
|
}, 15000, 1000));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpiredAfterCacheExhausted() throws Exception {
|
||||||
|
final ActiveMQQueue queue = new ActiveMQQueue("test.q");
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
|
||||||
|
factory.setWatchTopicAdvisories(false);
|
||||||
|
Connection conn = factory.createConnection();
|
||||||
|
conn.start();
|
||||||
|
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
MessageProducer producer = sess.createProducer(queue);
|
||||||
|
producer.setTimeToLive(1000);
|
||||||
|
String payLoad = new String(new byte[16*1024]);
|
||||||
|
|
||||||
|
final int numMessages = 500;
|
||||||
|
for (int i = 0; i < numMessages; i++) {
|
||||||
|
producer.send(sess.createTextMessage("test message: " + payLoad));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(Wait.waitFor(new Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
long expired = brokerService.getDestination(queue).getDestinationStatistics().getExpired().getCount();
|
||||||
|
LOG.info("Expired: " + expired);
|
||||||
|
return expired == numMessages;
|
||||||
|
}
|
||||||
|
}, 15000, 1000));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue