From 49189cd7e63a64fcda947dbd72fd7849348b71c9 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Sat, 18 May 2024 10:48:48 -0400 Subject: [PATCH] ARTEMIS-4776 Pages may leak as open on Replicated Target PagingStore is supposed to send an event to replica on every file that is closed. There are a few situation where the sendClose is being missed and that could generate leaks on the target --- .../artemis/core/paging/impl/Page.java | 8 +- .../core/paging/impl/PagingStoreImpl.java | 4 +- .../core/replication/ReplicationEndpoint.java | 4 + .../core/replication/ReplicationManager.java | 3 + .../utils/cli/helper/HelperCreate.java | 16 +- .../failover/NettyReplicatedFailoverTest.java | 88 ++++ .../tests/smoke/checkTest/CheckTest.java | 4 +- .../mirror/ReplicatedMirrorTargetTest.java | 419 ++++++++++++++++++ 8 files changed, 530 insertions(+), 16 deletions(-) create mode 100644 tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedMirrorTargetTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index b2c01852a8..ffd6e4e064 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -231,21 +231,21 @@ public final class Page { return isOpen; } - public void close(boolean sendEvent) throws Exception { - close(sendEvent, true); + public void close(boolean sendReplicaClose) throws Exception { + close(sendReplicaClose, true); } /** * sendEvent means it's a close happening from a major event such moveNext. * While reading the cache we don't need (and shouldn't inform the backup */ - public synchronized void close(boolean sendEvent, boolean waitSync) throws Exception { + public synchronized void close(boolean sendReplicaClose, boolean waitSync) throws Exception { if (readFileBuffer != null) { fileFactory.releaseDirectBuffer(readFileBuffer); readFileBuffer = null; } - if (sendEvent && storageManager != null) { + if (sendReplicaClose && storageManager != null) { storageManager.pageClosed(storeName, pageId); } file.close(waitSync, waitSync); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 63baad12c4..28e7de80e6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -577,7 +577,7 @@ public class PagingStoreImpl implements PagingStore { final Page page = currentPage; if (page != null) { - page.close(false); + page.close(true); currentPage = null; } } @@ -994,7 +994,7 @@ public class PagingStoreImpl implements PagingStore { } returnPage = currentPage; - returnPage.close(false); + returnPage.close(true); resetCurrentPage(null); // The current page is empty... which means we reached the end of the pages diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index b3951c9d1c..4810ba9e9e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -902,6 +902,10 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon this.executor = executor2; } + public ConcurrentMap> getPageIndex() { + return pageIndex; + } + /** * This is for tests basically, do not use it as its API is not guaranteed for future usage. */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 9bbbf46e3a..3906af597e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -778,6 +778,9 @@ public final class ReplicationManager implements ActiveMQComponent { } finally { if (file.isOpen()) file.close(); + if (pageStore != null) { + sendReplicatePacket(new ReplicationPageEventMessage(pageStore, id, false, remotingConnection.isVersionUsingLongOnPageReplication())); + } } } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperCreate.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperCreate.java index a2e25267b0..293138e186 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperCreate.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperCreate.java @@ -1,4 +1,4 @@ -/* +/*false * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -70,7 +70,7 @@ public class HelperCreate extends HelperBase { private boolean clustered = false; - private boolean slave = false; + private boolean backup = false; private String staticCluster; @@ -184,12 +184,12 @@ public class HelperCreate extends HelperBase { return this; } - public boolean isSlave() { - return slave; + public boolean isBackup() { + return backup; } - public HelperCreate setSlave(boolean slave) { - this.slave = slave; + public HelperCreate setBackup(boolean backup) { + this.backup = backup; return this; } @@ -301,8 +301,8 @@ public class HelperCreate extends HelperBase { add(listCommands, "--no-web"); } - if (slave) { - add(listCommands, "--slave"); + if (backup) { + add(listCommands, "--backup"); } if (replicated) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java index 36deb2ddea..65277e97e1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java @@ -16,13 +16,34 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.failover; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import java.lang.invoke.MethodHandles; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.impl.Page; +import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer; import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class NettyReplicatedFailoverTest extends NettyFailoverTest { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + @Override protected TestableServer createTestableServer(Configuration config) { return new SameProcessActiveMQServer(createServer(true, config)); @@ -49,4 +70,71 @@ public class NettyReplicatedFailoverTest extends NettyFailoverTest { protected final void crash(ClientSession... sessions) throws Exception { crash(true, sessions); } + + @Test + public void testPagedInSync() throws Exception { + + String queueName = "testPagedInSync"; + + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("hello")); + session.commit(); + + org.apache.activemq.artemis.core.server.Queue serverQueue = primaryServer.getServer().locateQueue(queueName); + Assert.assertNotNull(serverQueue); + + serverQueue.getPagingStore().startPaging(); + + for (int i = 0; i < 50; i++) { + producer.send(session.createTextMessage("hello")); + session.commit(); + serverQueue.getPagingStore().forceAnotherPage(); + } + backupServer.stop(); + backupServer.start(); + Wait.assertTrue(backupServer.getServer()::isReplicaSync); + + SharedNothingBackupActivation activation = (SharedNothingBackupActivation) backupServer.getServer().getActivation(); + Map currentPages = activation.getReplicationEndpoint().getPageIndex().get(SimpleString.toSimpleString(queueName)); + + logger.info("There are {} page files open", currentPages.size()); + Wait.assertTrue(() -> currentPages.size() <= 1, 10_000); + + producer.send(session.createTextMessage("on currentPage")); + session.commit(); + + PagingStore store = primaryServer.getServer().getPagingManager().getPageStore(SimpleString.toSimpleString(queueName)); + Page currentPage = store.getCurrentPage(); + logger.info("Page {}", currentPage.getPageId()); + + Page depaged = null; + for (; ; ) { + depaged = store.depage(); + if (depaged == null || currentPage.getPageId() == depaged.getPageId()) { + break; + } + logger.info("depage :: {} and currentPageID={}", depaged.getPageId(), currentPage.getPageId()); + } + + Assert.assertNotNull(depaged); + + logger.info("Depaged:: {}", depaged.getPageId()); + + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("on current page")); + session.commit(); + store.depage(); + } + + logger.info("Size:: {}", currentPages.size()); + + Wait.assertTrue(() -> currentPages.size() <= 1, 10_000); + + } + } + } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/checkTest/CheckTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/checkTest/CheckTest.java index 3a13b4d3fa..e0b5bbdae7 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/checkTest/CheckTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/checkTest/CheckTest.java @@ -69,13 +69,13 @@ public class CheckTest extends SmokeTestBase { { HelperCreate cliCreateServer = new HelperCreate(); - cliCreateServer.setSharedStore(true).setSlave(false).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61716").setArtemisInstance(server0Location); + cliCreateServer.setSharedStore(true).setBackup(false).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61716").setArtemisInstance(server0Location); cliCreateServer.createServer(); } { HelperCreate cliCreateServer = new HelperCreate(); - cliCreateServer.setSharedStore(true).setSlave(true).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61616").setPortOffset(100).setArtemisInstance(server1Location); + cliCreateServer.setSharedStore(true).setBackup(true).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61616").setPortOffset(100).setArtemisInstance(server1Location); cliCreateServer.createServer(); } } diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedMirrorTargetTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedMirrorTargetTest.java new file mode 100644 index 0000000000..3a1cea632b --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedMirrorTargetTest.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.io.File; +import java.io.StringWriter; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.management.SimpleManagement; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.soak.SoakTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.util.ServerUtil; +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.activemq.artemis.utils.TestParameters; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.actors.OrderedExecutor; +import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicatedMirrorTargetTest extends SoakTestBase { + + private static final String TEST_NAME = "REPLICATED_MIRROR_SOAK"; + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + // Set this to true and log4j will be configured with some relevant log.trace for the AckManager at the server's + private static final boolean TRACE_LOGS = Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", "false")); + private static final int NUMBER_MESSAGES = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_000); + + // By default consuming 90% of the messages + private static final int NUMBER_MESSAGES_RECEIVE = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES_RECEIVE", 2_000); + private static final int RECEIVE_COMMIT = TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT", 200); + private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 200); + + // If -1 means to never kill the target broker + private static final int KILL_INTERVAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 1_000); + private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 300_000); + private static final int GENERAL_WAIT_TIMEOUT = TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000); + + /* + * Time each consumer takes to process a message received to allow some messages accumulating. + * This sleep happens right before the commit. + */ + private static final int CONSUMER_PROCESSING_TIME = TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0); + + private static final String TOPIC_NAME = "topicTest"; + + private static String body; + + static { + StringWriter writer = new StringWriter(); + while (writer.getBuffer().length() < 30 * 1024) { + writer.append("The sky is blue, ..... watch out for poop from the birds though!..."); + } + body = writer.toString(); + } + + public static final String DC1_NODE = "ReplicatedMirrorTargetTest/DC1"; + public static final String DC2_NODE = "ReplicatedMirrorTargetTest/DC2"; + public static final String DC2_REPLICA_NODE = "ReplicatedMirrorTargetTest/DC2_REPLICA"; + + volatile Process processDC1; + volatile Process processDC2; + volatile Process processDC2_REPLICA; + + @After + public void destroyServers() throws Exception { + if (processDC1 != null) { + processDC1.destroyForcibly(); + processDC1.waitFor(1, TimeUnit.MINUTES); + processDC1 = null; + } + if (processDC2 != null) { + processDC2.destroyForcibly(); + processDC2.waitFor(1, TimeUnit.MINUTES); + processDC2 = null; + } + + if (processDC2_REPLICA != null) { + processDC2_REPLICA.destroyForcibly(); + processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES); + processDC2_REPLICA = null; + } + + } + + private static final String DC1_URI = "tcp://localhost:61616"; + private static final String DC2_URI = "tcp://localhost:61618"; + + private static final String DC2_REPLICA_URI = "tcp://localhost:61619"; + + private static void createServer(String serverName, + String connectionName, + String mirrorURI, + int porOffset, + boolean paging, + boolean replicated, + String clusterStatic) throws Exception { + File serverLocation = getFileServerLocation(serverName); + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = new HelperCreate(); + cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation); + cliCreateServer.setNoWeb(false); + cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE); + cliCreateServer.addArgs("--addresses", TOPIC_NAME); + cliCreateServer.setPortOffset(porOffset); + if (replicated) { + cliCreateServer.setReplicated(true); + cliCreateServer.setStaticCluster(clusterStatic); + cliCreateServer.setClustered(true); + } else { + cliCreateServer.setClustered(false); + } + + cliCreateServer.createServer(); + + Properties brokerProperties = new Properties(); + brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI); + brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000"); + brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString()); + brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false"); + brokerProperties.put("largeMessageSync", "false"); + + if (paging) { + brokerProperties.put("addressSettings.#.maxSizeMessages", "1"); + brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000"); + brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1"); + brokerProperties.put("addressSettings.#.prefetchPageMessages", "500"); + } + // if we don't use pageTransactions we may eventually get a few duplicates + brokerProperties.put("mirrorPageTransaction", "true"); + File brokerPropertiesFile = new File(serverLocation, "broker.properties"); + saveProperties(brokerProperties, brokerPropertiesFile); + + File brokerXml = new File(serverLocation, "/etc/broker.xml"); + Assert.assertTrue(brokerXml.exists()); + // Adding redistribution delay to broker configuration + Assert.assertTrue(FileUtil.findReplace(brokerXml, "", "\n\n" + " 0 \n")); + + if (TRACE_LOGS) { + replaceLogs(serverLocation); + } + + } + + private static void replaceLogs(File serverLocation) throws Exception { + File log4j = new File(serverLocation, "/etc/log4j2.properties"); + Assert.assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + + "\n" + "logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n" + + "logger.endpoint.level=DEBUG\n" + + "appender.console.filter.threshold.type = ThresholdFilter\n" + + "appender.console.filter.threshold.level = info")); + } + + private static void createBackupServer(String serverName, + int porOffset, + String clusterStatic) throws Exception { + File serverLocation = getFileServerLocation(serverName); + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = new HelperCreate(); + cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation); + cliCreateServer.setMessageLoadBalancing("ON_DEMAND"); + cliCreateServer.setNoWeb(false); + cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE); + cliCreateServer.addArgs("--addresses", TOPIC_NAME); + cliCreateServer.setPortOffset(porOffset); + cliCreateServer.setClustered(true); + cliCreateServer.setReplicated(true); + cliCreateServer.setBackup(true); + cliCreateServer.setStaticCluster(clusterStatic); + cliCreateServer.createServer(); + + File brokerXml = new File(serverLocation, "/etc/broker.xml"); + Assert.assertTrue(brokerXml.exists()); + // Adding redistribution delay to broker configuration + Assert.assertTrue(FileUtil.findReplace(brokerXml, "", "\n\n" + " 0 \n")); + + if (TRACE_LOGS) { + replaceLogs(serverLocation); + } + } + + public static void createRealServers(boolean paging) throws Exception { + createServer(DC1_NODE, "mirror", DC2_URI, 0, paging, false, null); + createServer(DC2_NODE, "mirror", DC1_URI, 2, paging, true, DC2_REPLICA_URI); + createBackupServer(DC2_REPLICA_NODE, 3, DC2_URI); + } + + private void startServers() throws Exception { + processDC1 = startServer(DC1_NODE, -1, -1, new File(getServerLocation(DC1_NODE), "broker.properties")); + processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties")); + + ServerUtil.waitForServerToStart(0, 10_000); + ServerUtil.waitForServerToStart(2, 10_000); + + processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties")); + + Thread.sleep(5000); + } + + @Test + public void testMirrorOnReplica() throws Exception { + createRealServers(true); + startServers(); + + + Assert.assertTrue(KILL_INTERVAL > SEND_COMMIT || KILL_INTERVAL < 0); + + String clientIDA = "nodeA"; + String clientIDB = "nodeB"; + String subscriptionID = "my-order"; + String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror"; + + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_URI); + + consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT); + consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT); + + SimpleManagement managementDC1 = new SimpleManagement(DC1_URI, null, null); + SimpleManagement managementDC2 = new SimpleManagement(DC2_URI, null, null); + + runAfter(() -> managementDC1.close()); + runAfter(() -> managementDC2.close()); + + Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID)); + Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID)); + Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID)); + Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID)); + + ExecutorService executorService = Executors.newFixedThreadPool(3); + runAfter(executorService::shutdownNow); + CountDownLatch consumerDone = new CountDownLatch(2); + executorService.execute(() -> { + try { + consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } finally { + consumerDone.countDown(); + } + }); + executorService.execute(() -> { + try { + consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } finally { + consumerDone.countDown(); + } + }); + + OrderedExecutor restartExeuctor = new OrderedExecutor(executorService); + AtomicBoolean running = new AtomicBoolean(true); + runAfter(() -> running.set(false)); + + try (Connection connection = connectionFactoryDC1A.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createTopic(TOPIC_NAME)); + for (int i = 0; i < NUMBER_MESSAGES; i++) { + TextMessage message = session.createTextMessage(body); + message.setIntProperty("i", i); + message.setBooleanProperty("large", false); + producer.send(message); + if (i > 0 && i % SEND_COMMIT == 0) { + logger.info("Sent {} messages", i); + session.commit(); + } + if (KILL_INTERVAL > 0 && i > 0 && i % KILL_INTERVAL == 0) { + restartExeuctor.execute(() -> { + if (running.get()) { + try { + logger.info("Restarting target server (DC2)"); + if (processDC2_REPLICA != null) { + processDC2_REPLICA.destroyForcibly(); + processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES); + processDC2_REPLICA = null; + } + if (processDC2 != null) { + processDC2.destroyForcibly(); + processDC2.waitFor(1, TimeUnit.MINUTES); + processDC2 = null; + } + processDC2 = startServer(DC2_NODE, 2, 10_000, new File(getServerLocation(DC2_NODE), "broker.properties")); + processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + }); + } + } + session.commit(); + running.set(false); + } + + consumerDone.await(SNF_TIMEOUT, TimeUnit.MILLISECONDS); + + Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT); + Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT); + Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT); + + destroyServers(); + + // counting the number of records on duplicate cache + // to validate if ARTEMIS-4765 is fixed + ActiveMQServer server = createServer(true, false); + server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) + "/data/journal"); + server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) + "/data/bindings"); + server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE) + "/data/paging"); + server.start(); + server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000); + HashMap records = countJournal(server.getConfiguration()); + AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID); + Assert.assertNotNull(duplicateRecordsCount); + // 1000 credits by default + Assert.assertTrue(duplicateRecordsCount.get() <= 1000); + + } + + private static void consume(ConnectionFactory factory, + String clientID, + String subscriptionID, + int start, + int numberOfMessages, + boolean expectEmpty, + boolean assertBody, + int batchCommit) throws Exception { + try (Connection connection = factory.createConnection()) { + connection.setClientID(clientID); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + connection.start(); + MessageConsumer consumer = session.createDurableConsumer(topic, subscriptionID); + boolean failed = false; + + int pendingCommit = 0; + + for (int i = start; i < start + numberOfMessages; i++) { + TextMessage message = (TextMessage) consumer.receive(10_000); + Assert.assertNotNull(message); + logger.debug("Received message {}, large={}", message.getIntProperty("i"), message.getBooleanProperty("large")); + if (message.getIntProperty("i") != i) { + failed = true; + logger.warn("Expected message {} but got {}", i, message.getIntProperty("i")); + } + logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large")); + pendingCommit++; + if (pendingCommit >= batchCommit) { + if (CONSUMER_PROCESSING_TIME > 0) { + Thread.sleep(CONSUMER_PROCESSING_TIME); + } + logger.info("received {}", i); + session.commit(); + pendingCommit = 0; + } + } + session.commit(); + + Assert.assertFalse(failed); + + if (expectEmpty) { + Assert.assertNull(consumer.receiveNoWait()); + } + } + } + + public long getCount(SimpleManagement simpleManagement, String queue) { + try { + long value = simpleManagement.getMessageCountOnQueue(queue); + logger.info("Queue {} count = {}", queue, value); + return value; + } catch (Exception e) { + logger.warn(e.getMessage(), e); + return -1; + } + } +}