From 40966c769a7abab3d7042fd0cd9565c3c60098ee Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 18 Dec 2018 18:17:03 -0500 Subject: [PATCH] ARTEMIS-2207 Page Showing Log.warns for regular acked messages --- .../cursor/impl/PageSubscriptionImpl.java | 4 + .../tests/integration/paging/PagingTest.java | 139 ++++++++++++++---- 2 files changed, 111 insertions(+), 32 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 30e8528ea1..7aab4e4248 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -1284,6 +1284,10 @@ public final class PageSubscriptionImpl implements PageSubscription { continue; } + if (info != null && info.isAck(message.getPosition())) { + continue; + } + // 2nd ... if TX, is it committed? if (valid && message.getPagedMessage().getTransactionID() >= 0) { PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage().getTransactionID()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 180993fe85..23a1e32d0b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -71,6 +71,8 @@ import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStoreFactory; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.artemis.core.paging.cursor.PageIterator; +import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl; import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl; import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl; @@ -277,6 +279,96 @@ public class PagingTest extends ActiveMQTestBase { } } + @Test + public void testPageTX() throws Exception { + AssertionLoggerHandler.startCapture(); + + try { + Configuration config = createDefaultInVMConfig(); + + final int PAGE_MAX = 20 * 1024; + + final int PAGE_SIZE = 10 * 1024; + + ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX); + server.start(); + + final int numberOfBytes = 1024; + + locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(false); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + ClientSession session = sf.createSession(null, null, false, false, false, false, 0); + + session.createQueue(ADDRESS, ADDRESS.concat("-0"), null, true); + + server.getPagingManager().getPageStore(ADDRESS).forceAnotherPage(); + server.getPagingManager().getPageStore(ADDRESS).disableCleanup(); + session.start(); + + ClientProducer producer = session.createProducer(ADDRESS); + + ClientConsumer browserConsumer = session.createConsumer(ADDRESS.concat("-0"), true); + + ClientMessage message = null; + + for (int i = 0; i < 201; i++) { + message = session.createMessage(true); + + message.getBodyBuffer().writerIndex(0); + + message.getBodyBuffer().writeBytes(new byte[numberOfBytes]); + + for (int j = 1; j <= numberOfBytes; j++) { + message.getBodyBuffer().writeInt(j); + } + + producer.send(message); + session.commit(); + } + + ClientConsumer consumer = session.createConsumer(ADDRESS.concat("-0")); + + session.start(); + + for (int i = 0; i < 201; i++) { + ClientMessage message2 = consumer.receive(10000); + + Assert.assertNotNull(message2); + + message2.acknowledge(); + + Assert.assertNotNull(message2); + + session.commit(); + } + + consumer.close(); + + Queue queue = server.locateQueue(ADDRESS.concat("-0")); + + PagingStore store = server.getPagingManager().getPageStore(ADDRESS); + PageCursorProvider provider = store.getCursorProvider(); + + PageSubscription cursorSubscription = provider.getSubscription(queue.getID()); + PageIterator iterator = (PageIterator) cursorSubscription.iterator(); + + for (int i = 0; i < 5; i++) { + Assert.assertFalse(iterator.hasNext()); + Assert.assertNull(browserConsumer.receiveImmediate()); + } + + session.close(); + Assert.assertFalse(AssertionLoggerHandler.findText("Could not locate page")); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222029")); + server.getPagingManager().getPageStore(ADDRESS).enableCleanup(); + Wait.assertFalse(server.getPagingManager().getPageStore(ADDRESS)::isPaging); + } finally { + AssertionLoggerHandler.stopCapture(); + } + } + @Test public void testPageCleanup() throws Exception { clearDataRecreateServerDirs(); @@ -396,7 +488,6 @@ public class PagingTest extends ActiveMQTestBase { System.out.println("pgComplete = " + pgComplete); } - @Test public void testPurge() throws Exception { clearDataRecreateServerDirs(); @@ -410,7 +501,7 @@ public class PagingTest extends ActiveMQTestBase { String queue = "purgeQueue"; SimpleString ssQueue = new SimpleString(queue); server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST)); - QueueImpl purgeQueue = (QueueImpl)server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false); + QueueImpl purgeQueue = (QueueImpl) server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); Connection connection = cf.createConnection(); @@ -465,7 +556,6 @@ public class PagingTest extends ActiveMQTestBase { connection.start(); - server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(50000); Assert.assertNotNull(consumer.receive(5000)); session.commit(); @@ -478,7 +568,6 @@ public class PagingTest extends ActiveMQTestBase { StorageManager sm = server.getStorageManager(); - for (int i = 0; i < 1000; i++) { long tx = sm.generateID(); PageTransactionInfoImpl txinfo = new PageTransactionInfoImpl(tx); @@ -494,7 +583,6 @@ public class PagingTest extends ActiveMQTestBase { Assert.assertEquals(0, server.getPagingManager().getTransactions().size()); } - // First page is complete but it wasn't deleted @Test public void testFirstPageCompleteNotDeleted() throws Exception { @@ -1741,17 +1829,13 @@ public class PagingTest extends ActiveMQTestBase { for (int i = 0; i < 4; i++) { message = consumer.receive(5000); - Assert.assertNotNull("Before restart - message " + i + " is empty.",message); + Assert.assertNotNull("Before restart - message " + i + " is empty.", message); message.acknowledge(); } - - server.stop(); mainCleanup.set(false); - - // Deleting the paging data. Simulating a failure // a dumb user, or anything that will remove the data deleteDirectory(new File(getPageDir())); @@ -1772,7 +1856,6 @@ public class PagingTest extends ActiveMQTestBase { bodyLocal.writeBytes(new byte[MESSAGE_SIZE]); - producer.send(message); } session.commit(); @@ -1788,7 +1871,7 @@ public class PagingTest extends ActiveMQTestBase { for (int i = 0; i < 4; i++) { message = consumer.receive(5000); - Assert.assertNotNull("After restart - message " + i + " is empty.",message); + Assert.assertNotNull("After restart - message " + i + " is empty.", message); message.acknowledge(); } @@ -1852,7 +1935,6 @@ public class PagingTest extends ActiveMQTestBase { session.commit(); session.close(); - ArrayList records = new ArrayList<>(); List list = new ArrayList<>(); @@ -1865,9 +1947,7 @@ public class PagingTest extends ActiveMQTestBase { // Delete everything from the journal for (RecordInfo info : records) { - if (!info.isUpdate && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE && - info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_INC && - info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COMPLETE) { + if (!info.isUpdate && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_INC && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COMPLETE) { jrn.appendDeleteRecord(info.id, false); } } @@ -3502,8 +3582,6 @@ public class PagingTest extends ActiveMQTestBase { server.stop(); - // Thread.sleep(5000); - } @Test @@ -4775,9 +4853,7 @@ public class PagingTest extends ActiveMQTestBase { } catch (Throwable e) { log.info("output bytes = " + bytesOutput); log.info(threadDump("dump")); - fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") + - " with messageID=" + - message.getMessageID()); + fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") + " with messageID=" + message.getMessageID()); } } @@ -6090,16 +6166,16 @@ public class PagingTest extends ActiveMQTestBase { class NonStoppablePagingStoreImpl extends PagingStoreImpl { NonStoppablePagingStoreImpl(SimpleString address, - ScheduledExecutorService scheduledExecutor, - long syncTimeout, - PagingManager pagingManager, - StorageManager storageManager, - SequentialFileFactory fileFactory, - PagingStoreFactory storeFactory, - SimpleString storeName, - AddressSettings addressSettings, - ArtemisExecutor executor, - boolean syncNonTransactional) { + ScheduledExecutorService scheduledExecutor, + long syncTimeout, + PagingManager pagingManager, + StorageManager storageManager, + SequentialFileFactory fileFactory, + PagingStoreFactory storeFactory, + SimpleString storeName, + AddressSettings addressSettings, + ArtemisExecutor executor, + boolean syncNonTransactional) { super(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, syncNonTransactional); } @@ -6452,7 +6528,6 @@ public class PagingTest extends ActiveMQTestBase { server.stop(); } - @Override protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception { Configuration configuration = super.createDefaultConfig(serverID, netty);