diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 33a39e3e04..2a49ec3fe2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -728,7 +728,12 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon Page page = pages.remove(packet.getPageNumber()); if (page == null) { - page = getPage(packet.getStoreName(), packet.getPageNumber()); + // if page is null, we create it the instance and include it on the map + // then we must recurse this call + // so page.delete or page.close will not leave any closed objects on the hashmap + getPage(packet.getStoreName(), packet.getPageNumber()); + handlePageEvent(packet); + return; } if (page != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 69249c2b2f..2e1c7c2298 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -447,7 +447,7 @@ public interface ActiveMQMessageBundle { @Message(id = 229216, value = "Invalid queue name: {0}", format = Message.Format.MESSAGE_FORMAT) ActiveMQIllegalStateException invalidQueueName(SimpleString queueName); - @Message(id = 119217, value = "Can't write to closed file: {0}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 119217, value = "Cannot write to closed file: {0}", format = Message.Format.MESSAGE_FORMAT) ActiveMQIOErrorException cannotWriteToClosedFile(SequentialFile file); @Message(id = 229218, value = "Failed to locate broker configuration URL") diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java index d0d9f0f6d1..0797590bbc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java @@ -18,10 +18,19 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover; import java.util.HashMap; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; import org.junit.Test; public class ReplicatedPagedFailoverTest extends ReplicatedFailoverTest { @@ -37,6 +46,93 @@ public class ReplicatedPagedFailoverTest extends ReplicatedFailoverTest { @Override @Test public void testFailWithBrowser() throws Exception { - // paged messages are not available for browsing + internalBrowser(0); + } + + @Test + public void testFailWithBrowserWithClose() throws Exception { + internalBrowser(1); + } + + @Test + public void testFailWithBrowserWithDelete() throws Exception { + internalBrowser(2); + } + + // + // 0 - no tamper + // 1 - close files + // 2 - remove files + private void internalBrowser(int temperMode) throws Exception { + int numMessages = 50; + int messagesPerPage = 10; + int iterations = 10; + createSessionFactory(); + ClientSession session = createSession(sf, true, true); + + session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, false); + + ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS); + + Queue queue = liveServer.getServer().locateQueue(FailoverTest.ADDRESS); + + for (int j = 0; j < iterations; j++) { + System.err.println("#iteration " + j); + queue.getPageSubscription().getPagingStore().startPaging(); + Assert.assertNotNull(queue); + + for (int i = 0; i < numMessages; i++) { + // some are durable, some are not! + producer.send(createMessage(session, i, i % 2 == 0)); + if (i > 0 && i % messagesPerPage == 0) { + queue.getPageSubscription().getPagingStore().forceAnotherPage(); + } + } + + ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true); + + session.start(); + + while (true) { + ClientMessage msg = consumer.receive(500); + if (msg == null) { + break; + } + } + consumer.close(); + + PagingStore store = queue.getPageSubscription().getPagingStore(); + + if (temperMode == 1) { + // this is tampering with the system causing an artifical issue. The system should still heal itself. + for (long pageID = store.getFirstPage(); pageID <= store.getCurrentPage().getPageId() + 10; pageID++) { + System.out.println("Sending close on " + pageID); + liveServer.getServer().getStorageManager().pageClosed(store.getStoreName(), (int) pageID); + } + } else if (temperMode == 2) { + // this is tampering with the system causing an artifical issue. The system should still heal itself. + for (long pageID = store.getFirstPage(); pageID <= store.getCurrentPage().getPageId() + 10; pageID++) { + System.out.println("Sending close on " + pageID); + liveServer.getServer().getStorageManager().pageDeleted(store.getStoreName(), (int) pageID); + } + } + store.getFirstPage(); + store.getCurrentPage().getPageId(); + + consumer = session.createConsumer(FailoverTestBase.ADDRESS, false); + session.start(); + + while (true) { + ClientMessage msg = consumer.receive(500); + if (msg == null) { + break; + } + msg.acknowledge(); + } + consumer.close(); + + Wait.assertFalse(queue.getPageSubscription().getPagingStore()::isPaging); + } + } }