mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4930 - ensure we page in messages for browse/expire when destination stats are disabled via config
This commit is contained in:
parent
6bdce73d83
commit
41659725f4
|
@ -1232,9 +1232,17 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
} finally {
|
||||
pagedInMessagesLock.readLock().unlock();
|
||||
}
|
||||
LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, destinationStatistics.getMessages().getCount(), memoryUsage.getPercentUsage()});
|
||||
int messagesInQueue = 0;
|
||||
messagesLock.readLock().lock();
|
||||
try {
|
||||
messagesInQueue = messages.size();
|
||||
} finally {
|
||||
messagesLock.readLock().unlock();
|
||||
}
|
||||
|
||||
LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()});
|
||||
return (alreadyPagedIn < max)
|
||||
&& (alreadyPagedIn < destinationStatistics.getMessages().getCount())
|
||||
&& (alreadyPagedIn < messagesInQueue)
|
||||
&& messages.hasSpace();
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.activemq.broker.region.RegionBroker;
|
|||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -37,6 +38,7 @@ public class AMQ4930Test extends TestCase {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(AMQ4930Test.class);
|
||||
final int messageCount = 150;
|
||||
final int messageSize = 1024*1024;
|
||||
final int maxBrowsePageSize = 50;
|
||||
final ActiveMQQueue bigQueue = new ActiveMQQueue("BIG");
|
||||
BrokerService broker;
|
||||
ActiveMQConnectionFactory factory;
|
||||
|
@ -50,8 +52,8 @@ public class AMQ4930Test extends TestCase {
|
|||
PolicyEntry policy = new PolicyEntry();
|
||||
// disable expriy processing as this will call browse in parallel
|
||||
policy.setExpireMessagesPeriod(0);
|
||||
policy.setMaxPageSize(50);
|
||||
policy.setMaxBrowsePageSize(50);
|
||||
policy.setMaxPageSize(maxBrowsePageSize);
|
||||
policy.setMaxBrowsePageSize(maxBrowsePageSize);
|
||||
pMap.setDefaultEntry(policy);
|
||||
|
||||
broker.setDestinationPolicy(pMap);
|
||||
|
@ -65,6 +67,11 @@ public class AMQ4930Test extends TestCase {
|
|||
doTestBrowsePending(DeliveryMode.PERSISTENT);
|
||||
}
|
||||
|
||||
public void testWithStatsDisabled() throws Exception {
|
||||
((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().setEnabled(false);
|
||||
doTestBrowsePending(DeliveryMode.PERSISTENT);
|
||||
}
|
||||
|
||||
public void doTestBrowsePending(int deliveryMode) throws Exception {
|
||||
|
||||
Connection connection = factory.createConnection();
|
||||
|
@ -77,7 +84,6 @@ public class AMQ4930Test extends TestCase {
|
|||
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
producer.send(bigQueue, bytesMessage);
|
||||
LOG.info("Sent: " + i);
|
||||
}
|
||||
|
||||
final QueueViewMBean queueViewMBean = (QueueViewMBean)
|
||||
|
@ -94,15 +100,21 @@ public class AMQ4930Test extends TestCase {
|
|||
final Queue underTest = (Queue) ((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(bigQueue);
|
||||
|
||||
// do twice to attempt to pull in 2*maxBrowsePageSize which uses up the system memory limit
|
||||
underTest.browse();
|
||||
underTest.browse();
|
||||
Message[] browsed = underTest.browse();
|
||||
LOG.info("Browsed: " + browsed.length);
|
||||
assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
|
||||
browsed = underTest.browse();
|
||||
LOG.info("Browsed: " + browsed.length);
|
||||
assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
|
||||
Runtime.getRuntime().gc();
|
||||
long free = Runtime.getRuntime().freeMemory()/1024;
|
||||
LOG.info("free at start of check: " + free);
|
||||
// check for memory growth
|
||||
for (int i=0; i<10; i++) {
|
||||
LOG.info("free: " + Runtime.getRuntime().freeMemory()/1024);
|
||||
underTest.browse();
|
||||
browsed = underTest.browse();
|
||||
LOG.info("Browsed: " + browsed.length);
|
||||
assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
|
||||
Runtime.getRuntime().gc();
|
||||
Runtime.getRuntime().gc();
|
||||
assertTrue("No growth: " + Runtime.getRuntime().freeMemory()/1024, Runtime.getRuntime().freeMemory()/1024 >= (free - (free * 0.1)));
|
||||
|
|
Loading…
Reference in New Issue