This closes #320

This commit is contained in:
Clebert Suconic 2016-01-13 15:53:55 -05:00
commit ff47ab2136
2 changed files with 9 additions and 7 deletions

View File

@ -1271,9 +1271,9 @@ final class PageSubscriptionImpl implements PageSubscription {
deliveredCount.incrementAndGet();
PagedReference delivery = currentDelivery;
if (delivery != null) {
PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(currentDelivery.getPosition());
PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(delivery.getPosition());
if (info != null) {
info.remove(currentDelivery.getPosition());
info.remove(delivery.getPosition());
}
}
}

View File

@ -4123,7 +4123,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setThreadPoolMaxSize(5).setJournalSyncNonTransactional(false);
Map<String, AddressSettings> settings = new HashMap<>();
AddressSettings dla = new AddressSettings().setMaxDeliveryAttempts(5).setDeadLetterAddress(new SimpleString("DLA"));
AddressSettings dla = new AddressSettings().setMaxDeliveryAttempts(5).setDeadLetterAddress(new SimpleString("DLA")).setRedeliveryDelay(0);
settings.put(ADDRESS.toString(), dla);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
@ -4132,6 +4132,7 @@ public class PagingTest extends ActiveMQTestBase {
final int messageSize = 1024;
ServerLocator locator = null;
ClientSessionFactory sf = null;
ClientSession session = null;
@ -4140,16 +4141,17 @@ public class PagingTest extends ActiveMQTestBase {
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
sf = locator.createSessionFactory();
session = sf.createSession(false, false, false);
session.createQueue(ADDRESS, ADDRESS, true);
session.createQueue("DLA", "DLA", true);
Queue serverQueue = server.locateQueue(ADDRESS);
Queue serverQueueDLA = server.locateQueue(SimpleString.toSimpleString("DLA"));
PagingStore pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
pgStoreAddress.startPaging();
PagingStore pgStoreDLA = server.getPagingManager().getPageStore(new SimpleString("DLA"));
@ -4185,7 +4187,7 @@ public class PagingTest extends ActiveMQTestBase {
msg.acknowledge();
assertEquals("str" + msgNr, msg.getStringProperty("id"));
// assertEquals("str" + msgNr, msg.getStringProperty("id"));
for (int j = 0; j < messageSize; j++) {
assertEquals(getSamplebyte(j), msg.getBodyBuffer().readByte());
@ -4309,7 +4311,7 @@ public class PagingTest extends ActiveMQTestBase {
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
pgStoreAddress.getCursorProvider().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries(false);
pgStoreAddress.getCursorProvider().getSubscription(serverQueue.getID()).cleanupEntries(false);
pgStoreAddress.getCursorProvider().cleanup();