ACTIVEMQ6-54 Depaging is not kicking in on some scenarios, and Browsing is not looking towards paging

https://issues.apache.org/jira/browse/ACTIVEMQ6-54

This is fixing a few issues around paging:
- Browsing it not looking towards Paging. I'm using the queue.totalIterator which is a read-only iterator that goes towards the pages messages.
- Depage is not kicking correctly in some scenarios. I have improved the logic on scheduling depage for that.
This commit is contained in:
Clebert Suconic 2014-12-05 12:25:07 -05:00
parent aec50cf250
commit 933d90a4f3
3 changed files with 141 additions and 3 deletions

View File

@ -664,6 +664,8 @@ public class QueueImpl implements Queue
// no-op // no-op
scheduledRunners.decrementAndGet(); scheduledRunners.decrementAndGet();
} }
checkDepage();
} }
} }
@ -2188,12 +2190,32 @@ public class QueueImpl implements Queue
} }
} }
if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext() && !depagePending) checkDepage();
}
private void checkDepage()
{
if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.hasNext())
{ {
scheduleDepage(false); scheduleDepage(false);
} }
} }
/**
* This is a common check we do before scheduling depaging.. or while depaging.
* Before scheduling a depage runnable we verify if it fits / needs depaging.
* We also check for while needsDepage While depaging.
* This is just to avoid a copy & paste dependency
* @return
*/
private boolean needsDepage()
{
return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize();
}
private SimpleString extractGroupID(MessageReference ref) private SimpleString extractGroupID(MessageReference ref)
{ {
if (internalQueue) if (internalQueue)
@ -2268,7 +2290,7 @@ public class QueueImpl implements Queue
this.directDeliver = false; this.directDeliver = false;
int depaged = 0; int depaged = 0;
while (timeout > System.currentTimeMillis() && queueMemorySize.get() < maxSize && pageIterator.hasNext()) while (timeout > System.currentTimeMillis() && needsDepage() && pageIterator.hasNext())
{ {
depaged++; depaged++;
PagedReference reference = pageIterator.next(); PagedReference reference = pageIterator.next();

View File

@ -213,7 +213,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
if (browseOnly) if (browseOnly)
{ {
browserDeliverer = new BrowserDeliverer(messageQueue.iterator()); browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator());
} }
else else
{ {

View File

@ -5184,6 +5184,8 @@ public class PagingTest extends ServiceTestBase
* When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler works properly: * When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler works properly:
* *
* -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration=file:<path_to_source>/tests/config/logging.properties * -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration=file:<path_to_source>/tests/config/logging.properties
*
* Note: Idea should get these from the pom and you shouldn't need to do this.
*/ */
public void testFailMessagesNonDurable() throws Exception public void testFailMessagesNonDurable() throws Exception
{ {
@ -5859,6 +5861,120 @@ public class PagingTest extends ServiceTestBase
} }
@Test
public void testMultiFiltersBrowsing() throws Throwable
{
internalTestMultiFilters(true);
}
@Test
public void testMultiFiltersRegularConsumer() throws Throwable
{
internalTestMultiFilters(false);
}
public void internalTestMultiFilters(boolean browsing) throws Throwable
{
clearDataRecreateServerDirs();
Configuration config = createDefaultConfig()
.setJournalSyncNonTransactional(false);
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
try
{
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnDurableSend(true);
ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(true, true, 0);
session.createQueue(ADDRESS.toString(), "Q1", null, true);
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
ClientProducer prod = session.createProducer(ADDRESS);
ClientMessage msg = null;
store.startPaging();
for (int i = 0; i < 100; i++)
{
msg = session.createMessage(true);
msg.putStringProperty("color", "red");
msg.putIntProperty("count", i);
prod.send(msg);
if (i > 0 && i % 10 == 0)
{
store.startPaging();
store.forceAnotherPage();
}
}
for (int i = 0; i < 100; i++)
{
msg = session.createMessage(true);
msg.putStringProperty("color", "green");
msg.putIntProperty("count", i);
prod.send(msg);
if (i > 0 && i % 10 == 0)
{
store.startPaging();
store.forceAnotherPage();
}
}
session.commit();
session.close();
session = sf.createSession(false, false, 0);
session.start();
ClientConsumer cons1;
if (browsing)
{
cons1 = session.createConsumer("Q1", "color='green'", true);
}
else
{
cons1 = session.createConsumer("Q1", "color='red'", false);
}
for (int i = 0; i < 100; i++)
{
msg = cons1.receive(5000);
System.out.println("Received " + msg);
assertNotNull(msg);
if (!browsing)
{
msg.acknowledge();
}
}
session.commit();
session.close();
}
finally
{
server.stop();
}
}
@Test @Test
public void testPendingACKOutOfOrder() throws Throwable public void testPendingACKOutOfOrder() throws Throwable
{ {