From f18b4ee0c9b48c7bad3cb071b43b4b835e8d2c9d Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 14 Jul 2017 15:04:43 -0400 Subject: [PATCH] ARTEMIS-1287/ARTEMIS-1292 Complete Page on the Journal fixing PagingTest.testDeletePhysicalPages --- .../artemis/core/message/impl/CoreMessage.java | 5 ++++- .../core/paging/cursor/PageSubscription.java | 2 +- .../paging/cursor/impl/PageSubscriptionImpl.java | 7 ++++++- .../core/paging/impl/PagingStoreImpl.java | 16 +++++++++++++--- .../journal/AbstractJournalStorageManager.java | 7 ++++++- .../integration/client/HangConsumerTest.java | 2 ++ .../tests/integration/paging/PagingTest.java | 7 ++++--- 7 files changed, 36 insertions(+), 10 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 369de7dcfd..0428a58fca 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -1079,10 +1079,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public String toString() { try { + checkProperties(); return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + - ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this); + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties + "]@" + System.identityHashCode(this); } catch (Throwable e) { + e.printStackTrace(); + System.exit(-1); return "ServerMessage[messageID=" + messageID + "]"; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java index ee05f61621..cec7f526ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java @@ -96,7 +96,7 @@ public interface PageSubscription { */ void reloadACK(PagePosition position); - void reloadPageCompletion(PagePosition position) throws Exception; + boolean reloadPageCompletion(PagePosition position) throws Exception; void reloadPageInfo(long pageNr); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 6ad4f480ae..f0451519da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -190,7 +190,10 @@ final class PageSubscriptionImpl implements PageSubscription { * cursor/subscription. */ @Override - public void reloadPageCompletion(PagePosition position) throws Exception { + public boolean reloadPageCompletion(PagePosition position) throws Exception { + if (!pageStore.checkPageFileExists((int)position.getPageNr())) { + return false; + } // if the current page is complete, we must move it out of the way if (pageStore != null && pageStore.getCurrentPage() != null && pageStore.getCurrentPage().getPageId() == position.getPageNr()) { @@ -201,6 +204,8 @@ final class PageSubscriptionImpl implements PageSubscription { synchronized (consumedPages) { consumedPages.put(Long.valueOf(position.getPageNr()), info); } + + return true; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 6486ec9816..ad9e21801b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -520,6 +520,12 @@ public class PagingStoreImpl implements PagingStore { @Override public boolean checkPageFileExists(final int pageNumber) { String fileName = createFileName(pageNumber); + + try { + checkFileFactory(); + } catch (Exception ignored) { + } + SequentialFile file = fileFactory.createSequentialFile(fileName); return file.exists(); } @@ -528,9 +534,7 @@ public class PagingStoreImpl implements PagingStore { public Page createPage(final int pageNumber) throws Exception { String fileName = createFileName(pageNumber); - if (fileFactory == null) { - fileFactory = storeFactory.newFileFactory(getStoreName()); - } + checkFileFactory(); SequentialFile file = fileFactory.createSequentialFile(fileName); @@ -546,6 +550,12 @@ public class PagingStoreImpl implements PagingStore { return page; } + private void checkFileFactory() throws Exception { + if (fileFactory == null) { + fileFactory = storeFactory.newFileFactory(getStoreName()); + } + } + @Override public void forceAnotherPage() throws Exception { openNewPage(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 0eb1dc312b..dc399c1b7a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1085,7 +1085,12 @@ public abstract class AbstractJournalStorageManager implements StorageManager { PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager); if (sub != null) { - sub.reloadPageCompletion(encoding.position); + if (!sub.reloadPageCompletion(encoding.position)) { + if (logger.isDebugEnabled()) { + logger.debug("Complete page " + encoding.position.getPageNr() + " doesn't exist on page manager " + sub.getPagingStore().getAddress()); + } + messageJournal.appendDeleteRecord(record.id, false); + } } else { ActiveMQServerLogger.LOGGER.cantFindQueueOnPageComplete(encoding.queueID); messageJournal.appendDeleteRecord(record.id, false); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 2fa89fb1c8..0de19a128e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -71,6 +71,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ReusableLatch; import org.junit.Assert; @@ -151,6 +152,7 @@ public class HangConsumerTest extends ActiveMQTestBase { // a flush to guarantee any pending task is finished on flushing out delivery and pending msgs queue.flushExecutor(); + Wait.waitFor(() -> getMessageCount(queue) == 2); Assert.assertEquals(2, getMessageCount(queue)); Assert.assertEquals(2, getMessagesAdded(queue)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 0eb8a57844..7eadeca236 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -1430,14 +1430,14 @@ public class PagingTest extends ActiveMQTestBase { producer = session.createProducer(PagingTest.ADDRESS); - for (int i = 0; i < numberOfMessages; i++) { + for (int i = 0; i < numberOfMessages * 2; i++) { message = session.createMessage(true); ActiveMQBuffer bodyLocal = message.getBodyBuffer(); bodyLocal.writeBytes(body); - message.putIntProperty(new SimpleString("id"), i); + message.putIntProperty(new SimpleString("theid"), i); producer.send(message); if (i % 1000 == 0) { @@ -1466,12 +1466,13 @@ public class PagingTest extends ActiveMQTestBase { for (int msgCount = 0; msgCount < numberOfMessages; msgCount++) { log.info("Received " + msgCount); msgReceived++; - ClientMessage msg = consumer.receiveImmediate(); + ClientMessage msg = consumer.receive(5000); if (msg == null) { log.info("It's null. leaving now"); sessionConsumer.commit(); fail("Didn't receive a message"); } + System.out.println("Message " + msg.getIntProperty(SimpleString.toSimpleString("theid"))); msg.acknowledge(); if (msgCount % 5 == 0) {