ARTEMIS-2430 Avoid data loss when live page cache evicted
This commit is contained in:
parent
c9f2833863
commit
b92cde165b
|
@ -161,6 +161,12 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
|||
if (!pagingStore.checkPageFileExists((int) pageId)) {
|
||||
return null;
|
||||
}
|
||||
Page currentPage = pagingStore.getCurrentPage();
|
||||
// Live page cache might be cleared by gc, we need to retrieve it otherwise partially written page cache is being returned
|
||||
if (currentPage != null && currentPage.getPageId() == pageId && (cache = currentPage.getLiveCache()) != null) {
|
||||
softCache.put(cache.getPageId(), cache);
|
||||
return cache;
|
||||
}
|
||||
inProgressReadPage = inProgressReadPages.get(pageId);
|
||||
if (inProgressReadPage == null) {
|
||||
final CompletableFuture<PageCache> readPage = new CompletableFuture<>();
|
||||
|
|
|
@ -100,6 +100,10 @@ public final class Page implements Comparable<Page> {
|
|||
this.pageCache = pageCache;
|
||||
}
|
||||
|
||||
public LivePageCache getLiveCache() {
|
||||
return pageCache;
|
||||
}
|
||||
|
||||
public synchronized List<PagedMessage> read(StorageManager storage) throws Exception {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("reading page " + this.pageId + " on address = " + storeName);
|
||||
|
|
|
@ -6586,6 +6586,69 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
server.stop();
|
||||
}
|
||||
|
||||
// We send messages to page, evict live page cache, send last message when mid consumed, and expect to receive all messages
|
||||
@Test
|
||||
public void testLivePageCacheEvicted() throws Throwable {
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
|
||||
server.start();
|
||||
|
||||
try {
|
||||
ServerLocator locator = createInVMNonHALocator().setBlockOnDurableSend(true);
|
||||
ClientSessionFactory sf = locator.createSessionFactory();
|
||||
ClientSession session = sf.createSession(true, true, 0);
|
||||
|
||||
session.createQueue(ADDRESS, ADDRESS, null, true);
|
||||
|
||||
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
|
||||
store.startPaging();
|
||||
|
||||
ClientProducer prod = session.createProducer(ADDRESS);
|
||||
int num = 10;
|
||||
for (int i = 0; i < num; i++) {
|
||||
ClientMessage msg = session.createMessage(true);
|
||||
msg.putIntProperty("index", i);
|
||||
prod.send(msg);
|
||||
}
|
||||
|
||||
session.start();
|
||||
ClientConsumer cons = session.createConsumer(ADDRESS);
|
||||
ClientMessage msgReceivedCons = null;
|
||||
// simulate the live page cache evicted
|
||||
store.getCursorProvider().clearCache();
|
||||
for (int i = 0; i < num; i++) {
|
||||
msgReceivedCons = cons.receive(1000);
|
||||
assertNotNull(msgReceivedCons);
|
||||
assertTrue(msgReceivedCons.getIntProperty("index") == i);
|
||||
msgReceivedCons.acknowledge();
|
||||
|
||||
session.commit();
|
||||
|
||||
if (i == num / 2) {
|
||||
ClientMessage msg = session.createMessage(true);
|
||||
msg.putIntProperty("index", num);
|
||||
prod.send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
msgReceivedCons = cons.receive(1000);
|
||||
assertNotNull(msgReceivedCons);
|
||||
assertTrue(msgReceivedCons.getIntProperty("index") == num);
|
||||
msgReceivedCons.acknowledge();
|
||||
|
||||
assertNull(cons.receiveImmediate());
|
||||
|
||||
session.commit();
|
||||
session.close();
|
||||
|
||||
waitForNotPaging(store);
|
||||
} finally {
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
|
||||
Configuration configuration = super.createDefaultConfig(serverID, netty);
|
||||
|
|
Loading…
Reference in New Issue