merge #35 - Fixing issues with paging (browsing & depaging)

This commit is contained in:
Andy Taylor 2014-12-08 11:28:02 +00:00
commit d42481db5c
4 changed files with 143 additions and 5 deletions

View File

@ -664,6 +664,8 @@ public class QueueImpl implements Queue
// no-op
scheduledRunners.decrementAndGet();
}
checkDepage();
}
}
@ -2188,12 +2190,32 @@ public class QueueImpl implements Queue
}
}
if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext() && !depagePending)
checkDepage();
}
private void checkDepage()
{
if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.hasNext())
{
scheduleDepage(false);
}
}
/**
* This is a common check we do before scheduling depaging.. or while depaging.
* Before scheduling a depage runnable we verify if it fits / needs depaging.
* We also check for while needsDepage While depaging.
* This is just to avoid a copy & paste dependency
* @return
*/
private boolean needsDepage()
{
return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize();
}
private SimpleString extractGroupID(MessageReference ref)
{
if (internalQueue)
@ -2268,7 +2290,7 @@ public class QueueImpl implements Queue
this.directDeliver = false;
int depaged = 0;
while (timeout > System.currentTimeMillis() && queueMemorySize.get() < maxSize && pageIterator.hasNext())
while (timeout > System.currentTimeMillis() && needsDepage() && pageIterator.hasNext())
{
depaged++;
PagedReference reference = pageIterator.next();

View File

@ -213,7 +213,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator());
}
else
{

View File

@ -30,8 +30,8 @@
<staging.siteURL>scp://people.apache.org/x1/www/activemq.apache.org</staging.siteURL>
<netty.version>4.0.20.Final</netty.version>
<activemq.version.versionName>Active Hornet</activemq.version.versionName>
<activemq.version.majorVersion>2</activemq.version.majorVersion>
<activemq.version.minorVersion>5</activemq.version.minorVersion>
<activemq.version.majorVersion>6</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion>
<activemq.version.incrementingVersion>125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionSuffix>SNAPSHOT</activemq.version.versionSuffix>

View File

@ -5184,6 +5184,8 @@ public class PagingTest extends ServiceTestBase
* When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler works properly:
*
* -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration=file:<path_to_source>/tests/config/logging.properties
*
* Note: Idea should get these from the pom and you shouldn't need to do this.
*/
public void testFailMessagesNonDurable() throws Exception
{
@ -5859,6 +5861,120 @@ public class PagingTest extends ServiceTestBase
}
@Test
public void testMultiFiltersBrowsing() throws Throwable
{
internalTestMultiFilters(true);
}
@Test
public void testMultiFiltersRegularConsumer() throws Throwable
{
internalTestMultiFilters(false);
}
public void internalTestMultiFilters(boolean browsing) throws Throwable
{
clearDataRecreateServerDirs();
Configuration config = createDefaultConfig()
.setJournalSyncNonTransactional(false);
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
try
{
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnDurableSend(true);
ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(true, true, 0);
session.createQueue(ADDRESS.toString(), "Q1", null, true);
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
ClientProducer prod = session.createProducer(ADDRESS);
ClientMessage msg = null;
store.startPaging();
for (int i = 0; i < 100; i++)
{
msg = session.createMessage(true);
msg.putStringProperty("color", "red");
msg.putIntProperty("count", i);
prod.send(msg);
if (i > 0 && i % 10 == 0)
{
store.startPaging();
store.forceAnotherPage();
}
}
for (int i = 0; i < 100; i++)
{
msg = session.createMessage(true);
msg.putStringProperty("color", "green");
msg.putIntProperty("count", i);
prod.send(msg);
if (i > 0 && i % 10 == 0)
{
store.startPaging();
store.forceAnotherPage();
}
}
session.commit();
session.close();
session = sf.createSession(false, false, 0);
session.start();
ClientConsumer cons1;
if (browsing)
{
cons1 = session.createConsumer("Q1", "color='green'", true);
}
else
{
cons1 = session.createConsumer("Q1", "color='red'", false);
}
for (int i = 0; i < 100; i++)
{
msg = cons1.receive(5000);
System.out.println("Received " + msg);
assertNotNull(msg);
if (!browsing)
{
msg.acknowledge();
}
}
session.commit();
session.close();
}
finally
{
server.stop();
}
}
@Test
public void testPendingACKOutOfOrder() throws Throwable
{