ACTIVEMQ6-54 Fixing tests broken after Paging fix

https://issues.apache.org/jira/browse/ACTIVEMQ6-54

Changing the order of depaging introduced an extra check that needs to be checked now.
This will probably take care of the issue by checking if the page is complete before depage.
This commit is contained in:
Clebert Suconic 2014-12-10 22:03:10 -05:00
parent 0eb6ebda22
commit 09490cdba3
2 changed files with 146 additions and 22 deletions

View File

@ -425,28 +425,7 @@ public class PageCursorProviderImpl implements PageCursorProvider
// on that case we need to move to verify it in a different way
if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0)
{
boolean complete = true;
for (PageSubscription cursor : cursorList)
{
if (!cursor.isComplete(minPage))
{
if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
{
ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + " was considered incomplete at page " + minPage);
}
complete = false;
break;
}
else
{
if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
{
ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + "was considered **complete** at page " + minPage);
}
}
}
boolean complete = checkPageCompletion(cursorList, minPage);
if (!pagingStore.isStarted())
{
@ -475,6 +454,10 @@ public class PageCursorProviderImpl implements PageCursorProvider
for (long i = pagingStore.getFirstPage(); i < minPage; i++)
{
if (!checkPageCompletion(cursorList, i))
{
break;
}
Page page = pagingStore.depage();
if (page == null)
{
@ -577,6 +560,33 @@ public class PageCursorProviderImpl implements PageCursorProvider
}
private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage)
{
boolean complete = true;
for (PageSubscription cursor : cursorList)
{
if (!cursor.isComplete(minPage))
{
if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
{
ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + " was considered incomplete at page " + minPage);
}
complete = false;
break;
}
else
{
if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
{
ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + "was considered **complete** at page " + minPage);
}
}
}
return complete;
}
/**
* @return
*/

View File

@ -112,6 +112,120 @@ public class PagingTest extends ServiceTestBase
locator = createInVMNonHALocator();
}
@Test
public void testPageOnLargeMessageMultipleQueues() throws Exception
{
Configuration config = createDefaultConfig();
final int PAGE_MAX = 20 * 1024;
final int PAGE_SIZE = 10 * 1024;
HashMap<String, AddressSettings> map = new HashMap<String, AddressSettings>();
AddressSettings value = new AddressSettings();
map.put(ADDRESS.toString(), value);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
final int numberOfBytes = 1024;
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(ADDRESS, ADDRESS.concat("-0"), null, true);
session.createQueue(ADDRESS, ADDRESS.concat("-1"), null, true);
ClientProducer producer = session.createProducer(ADDRESS);
ClientMessage message = null;
for (int i = 0; i < 201; i++)
{
message = session.createMessage(true);
message.getBodyBuffer().writerIndex(0);
message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
for (int j = 1; j <= numberOfBytes; j++)
{
message.getBodyBuffer().writeInt(j);
}
producer.send(message);
}
session.close();
server.stop();
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
sf = createSessionFactory(locator);
for (int ad = 0; ad < 2; ad++)
{
session = sf.createSession(false, false, false);
ClientConsumer consumer = session.createConsumer(ADDRESS.concat("-" + ad));
session.start();
for (int i = 0; i < 201; i++)
{
ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
Assert.assertNotNull(message2);
message2.acknowledge();
Assert.assertNotNull(message2);
}
try
{
if (ad > -1)
{
session.commit();
}
else
{
session.rollback();
for (int i = 0; i < 100; i++)
{
ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
Assert.assertNotNull(message2);
message2.acknowledge();
Assert.assertNotNull(message2);
}
session.commit();
}
}
catch (Throwable e)
{
System.err.println("here!!!!!!!");
e.printStackTrace();
System.exit(-1);
}
consumer.close();
session.close();
}
}
@Test
public void testPageCleanup() throws Exception
{