This closes #2369
This commit is contained in:
commit
54db13326d
|
@ -364,6 +364,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
||||||
ArrayList<PageSubscription> cursorList = cloneSubscriptions();
|
ArrayList<PageSubscription> cursorList = cloneSubscriptions();
|
||||||
|
|
||||||
long minPage = checkMinPage(cursorList);
|
long minPage = checkMinPage(cursorList);
|
||||||
|
deliverIfNecessary(cursorList, minPage);
|
||||||
|
|
||||||
logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage);
|
logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage);
|
||||||
|
|
||||||
|
@ -599,6 +600,24 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void deliverIfNecessary(Collection<PageSubscription> cursorList, long minPage) {
|
||||||
|
boolean currentWriting = minPage == pagingStore.getCurrentWritingPage() ? true : false;
|
||||||
|
for (PageSubscription cursor : cursorList) {
|
||||||
|
long firstPage = cursor.getFirstPage();
|
||||||
|
if (firstPage == minPage) {
|
||||||
|
/**
|
||||||
|
* if first page is current writing page and it's not complete, or
|
||||||
|
* first page is before the current writing page, we need to trigger
|
||||||
|
* deliverAsync to delete messages in the pages.
|
||||||
|
*/
|
||||||
|
if (cursor.getQueue().getMessageCount() == 0 && (!currentWriting || !cursor.isComplete(firstPage))) {
|
||||||
|
cursor.getQueue().deliverAsync();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Inner classes -------------------------------------------------
|
// Inner classes -------------------------------------------------
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -6289,6 +6289,170 @@ public class PagingTest extends ActiveMQTestBase {
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStopPagingWithoutConsumersIfTwoPages() throws Exception {
|
||||||
|
testStopPagingWithoutConsumersOnOneQueue(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStopPagingWithoutConsumersIfOnePage() throws Exception {
|
||||||
|
testStopPagingWithoutConsumersOnOneQueue(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testStopPagingWithoutConsumersOnOneQueue(boolean forceAnotherPage) throws Exception {
|
||||||
|
boolean persistentMessages = true;
|
||||||
|
|
||||||
|
clearDataRecreateServerDirs();
|
||||||
|
|
||||||
|
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||||
|
|
||||||
|
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
|
||||||
|
|
||||||
|
ClientSessionFactory sf = locator.createSessionFactory();
|
||||||
|
ClientSession session = sf.createSession(false, false, false);
|
||||||
|
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1 or both=true"), true);
|
||||||
|
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2 or both=true"), true);
|
||||||
|
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
|
||||||
|
Queue queue = server.locateQueue(PagingTest.ADDRESS.concat("=1"));
|
||||||
|
queue.getPageSubscription().getPagingStore().startPaging();
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||||
|
ClientMessage message = session.createMessage(persistentMessages);
|
||||||
|
message.putBooleanProperty("both", true);
|
||||||
|
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||||
|
bodyLocal.writeBytes(new byte[1024]);
|
||||||
|
producer.send(message);
|
||||||
|
session.commit();
|
||||||
|
session.start();
|
||||||
|
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=2"));
|
||||||
|
message = consumer.receive(5000);
|
||||||
|
assertNotNull(message);
|
||||||
|
message.acknowledge();
|
||||||
|
assertNull(consumer.receiveImmediate());
|
||||||
|
consumer.close();
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
if (forceAnotherPage) {
|
||||||
|
queue.getPageSubscription().getPagingStore().forceAnotherPage();
|
||||||
|
}
|
||||||
|
|
||||||
|
message = session.createMessage(persistentMessages);
|
||||||
|
message.putIntProperty("destQ", 1);
|
||||||
|
bodyLocal = message.getBodyBuffer();
|
||||||
|
bodyLocal.writeBytes(new byte[1024]);
|
||||||
|
producer.send(message);
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
consumer = session.createConsumer(PagingTest.ADDRESS.concat("=1"));
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
message = consumer.receive(5000);
|
||||||
|
assertNotNull(message);
|
||||||
|
message.acknowledge();
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
assertNull(consumer.receiveImmediate());
|
||||||
|
consumer.close();
|
||||||
|
session.close();
|
||||||
|
|
||||||
|
store.getCursorProvider().cleanup();
|
||||||
|
waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1")));
|
||||||
|
sf.close();
|
||||||
|
locator.close();
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
server.stop();
|
||||||
|
} catch (Throwable ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStopPagingWithoutMsgsOnOneQueue() throws Exception {
|
||||||
|
clearDataRecreateServerDirs();
|
||||||
|
|
||||||
|
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||||
|
|
||||||
|
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
final int numberOfMessages = 500;
|
||||||
|
|
||||||
|
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
|
||||||
|
|
||||||
|
sf = createSessionFactory(locator);
|
||||||
|
|
||||||
|
ClientSession session = sf.createSession(false, false, false);
|
||||||
|
|
||||||
|
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1"), true);
|
||||||
|
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2"), true);
|
||||||
|
|
||||||
|
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||||
|
ClientConsumer consumer1 = session.createConsumer(PagingTest.ADDRESS.concat("=1"));
|
||||||
|
session.start();
|
||||||
|
ClientSession session2 = sf.createSession(false, false, false);
|
||||||
|
ClientConsumer consumer2 = session2.createConsumer(PagingTest.ADDRESS.concat("=2"));
|
||||||
|
session2.start();
|
||||||
|
|
||||||
|
ClientMessage message = null;
|
||||||
|
|
||||||
|
byte[] body = new byte[MESSAGE_SIZE];
|
||||||
|
|
||||||
|
ByteBuffer bb = ByteBuffer.wrap(body);
|
||||||
|
|
||||||
|
for (int j = 1; j <= MESSAGE_SIZE; j++) {
|
||||||
|
bb.put(getSamplebyte(j));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Here we first send messages and consume them to move every subscription to the next bookmarked page.
|
||||||
|
* Then we send messages and consume them again, expecting paging is stopped normally.
|
||||||
|
*/
|
||||||
|
for (int x = 0; x < 2; x++) {
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
message = session.createMessage(true);
|
||||||
|
message.putIntProperty("destQ", 1);
|
||||||
|
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||||
|
bodyLocal.writeBytes(body);
|
||||||
|
producer.send(message);
|
||||||
|
if (i % 1000 == 0) {
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
|
||||||
|
assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
ClientMessage msg = consumer1.receive(1000);
|
||||||
|
assertNotNull(msg);
|
||||||
|
msg.acknowledge();
|
||||||
|
if (i % 500 == 0) {
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
assertNull(consumer1.receiveImmediate());
|
||||||
|
waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1")));
|
||||||
|
}
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
consumer1.close();
|
||||||
|
consumer2.close();
|
||||||
|
session.close();
|
||||||
|
session2.close();
|
||||||
|
sf.close();
|
||||||
|
locator.close();
|
||||||
|
locator = null;
|
||||||
|
sf = null;
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
|
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
|
||||||
Configuration configuration = super.createDefaultConfig(serverID, netty);
|
Configuration configuration = super.createDefaultConfig(serverID, netty);
|
||||||
|
|
Loading…
Reference in New Issue