From 993499daafbe546dbeae22febeeb5ad98b140396 Mon Sep 17 00:00:00 2001 From: JiriOndrusek Date: Mon, 23 Jul 2018 14:17:24 +0200 Subject: [PATCH] [ARTEMIS-1986] PagingTest#testDeletePhysicalPages will fail if a record about deleting a page is not saved in journal --- .../tests/integration/paging/PagingTest.java | 148 ++++++++++++++++++ 1 file changed, 148 insertions(+) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 566142d5d4..1436bea031 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -1585,6 +1585,154 @@ public class PagingTest extends ActiveMQTestBase { } + // 4 messages are send/received, it creates 2 pages, where for second page there is no delete completion record in journal + // server is restarted and 4 messages sent/received again. There should be no lost message. + @Test + public void testRestartWithCompleteAndDeletedPhysicalPage() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig(); + + final AtomicBoolean mainCleanup = new AtomicBoolean(true); + + class InterruptedCursorProvider extends PageCursorProviderImpl { + + InterruptedCursorProvider(PagingStore pagingStore, + StorageManager storageManager, + ArtemisExecutor executor, + int maxCacheSize) { + super(pagingStore, storageManager, executor, maxCacheSize); + } + + @Override + public void cleanup() { + if (mainCleanup.get()) { + super.cleanup(); + } else { + try { + pagingStore.unlock(); + } catch (Throwable ignored) { + } + } + } + } + + server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) { + @Override + protected PagingStoreFactoryNIO getPagingStoreFactory() { + return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) { + @Override + public PageCursorProvider newCursorProvider(PagingStore store, + StorageManager storageManager, + AddressSettings addressSettings, + ArtemisExecutor executor) { + return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize()); + } + }; + } + + }; + + addServer(server); + + AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(MESSAGE_SIZE). + setMaxSizeBytes(2 * MESSAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + + server.getAddressSettingsRepository().addMatch("#", defaultSetting); + + server.start(); + + locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + sf = createSessionFactory(locator); + ClientSession session = sf.createSession(true, true, 0); + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); + + Queue queue = server.locateQueue(ADDRESS); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + + ClientMessage message; + + for (int i = 0; i < 4; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(new byte[MESSAGE_SIZE]); + + producer.send(message); + session.commit(); + + //last page (#2, whch contains only message #3) is marked as complete - is full - but no delete complete record is added + if (i == 3) { + queue.getPageSubscription().getPagingStore().forceAnotherPage(); + } + + } + + Assert.assertEquals(3, queue.getPageSubscription().getPagingStore().getCurrentWritingPage()); + + ClientConsumer consumer = session.createConsumer(ADDRESS); + session.start(); + + for (int i = 0; i < 4; i++) { + message = consumer.receive(5000); + Assert.assertNotNull("Before restart - message " + i + " is empty.",message); + message.acknowledge(); + } + + + + server.stop(); + mainCleanup.set(false); + + + + // Deleting the paging data. Simulating a failure + // a dumb user, or anything that will remove the data + deleteDirectory(new File(getPageDir())); + + logger.trace("Server restart"); + + server.start(); + + locator = createInVMNonHALocator(); + sf = createSessionFactory(locator); + session = sf.createSession(null, null, false, false, true, false, 0); + producer = session.createProducer(PagingTest.ADDRESS); + + for (int i = 0; i < 4; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(new byte[MESSAGE_SIZE]); + + + producer.send(message); + } + session.commit(); + + mainCleanup.set(true); + + queue = server.locateQueue(ADDRESS); + queue.getPageSubscription().cleanupEntries(false); + queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup(); + + consumer = session.createConsumer(ADDRESS); + session.start(); + + for (int i = 0; i < 4; i++) { + message = consumer.receive(5000); + Assert.assertNotNull("After restart - message " + i + " is empty.",message); + message.acknowledge(); + } + + server.stop(); + + } + @Test public void testMissingTXEverythingAcked() throws Exception { clearDataRecreateServerDirs();