From c523458a9aa4f67ad0e9bdbc5c4733bc88bf55f6 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 2 May 2024 10:28:42 -0400 Subject: [PATCH] ARTEMIS-4758 Hardening Mirroring This is a list of improvements done as part of this commit / task: * Page Transactions on mirror target are now optional. If you had an interrupt mirror while the target destination was paging, duplicate detection would be ineffective unless you used paged transactions Users can now configure the ack manager retries intervals. Say you need some time to remove a consumer from a target mirror. The delivering references would prevent acks from happening. You can allow bigger retry intervals and number of retries by tinkiering with ack manager retry parameters. * AckManager restarted independent of incoming acks The ackManager was only restarted when new acks were coming in. If you stopped receiving acks on a target server and restarted that server with pending acks, those acks would never be exercised. The AckManager is now restarted as soon as the server is started. --- .../server/ActiveMQScheduledComponent.java | 15 +- .../utils/ActiveMQScheduledComponentTest.java | 46 +++ .../config/ActiveMQDefaultConfiguration.java | 32 ++ .../journal/collections/JournalHashMap.java | 5 + .../broker/ProtonProtocolManagerFactory.java | 3 +- .../mirror/AMQPMirrorControllerSource.java | 3 + .../mirror/AMQPMirrorControllerTarget.java | 12 +- .../amqp/connect/mirror/AckManager.java | 80 +++-- .../connect/mirror/AckManagerProvider.java | 8 +- .../connect/mirror/MirrorTransaction.java | 11 + .../artemis/core/config/Configuration.java | 26 ++ .../core/config/impl/ConfigurationImpl.java | 58 ++++ .../impl/FileConfigurationParser.java | 16 + .../core/paging/impl/PagingStoreImpl.java | 5 +- .../impl/journal/codec/AckRetry.java | 14 +- .../artemis/core/transaction/Transaction.java | 4 + .../schema/artemis-configuration.xsd | 42 +++ .../config/impl/FileConfigurationTest.java | 5 + .../ConfigurationTest-full-config.xml | 6 + .../ConfigurationTest-xinclude-config.xml | 6 + ...nfigurationTest-xinclude-schema-config.xml | 7 + .../amqp/connect/AckManagerTest.java | 16 +- .../mirror/SingleMirrorSoakTest.java | 311 ++++++++++++++++++ .../soak-tests/src/test/scripts/parameters.sh | 9 +- 24 files changed, 670 insertions(+), 70 deletions(-) create mode 100644 tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java index 9918c1f405..3fe9b75812 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java @@ -212,15 +212,20 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R } public synchronized ActiveMQScheduledComponent setPeriod(long period) { - this.period = period; - restartIfNeeded(); + if (this.period != period) { + this.period = period; + restartIfNeeded(); + } return this; } public synchronized ActiveMQScheduledComponent setPeriod(long period, TimeUnit unit) { - this.period = period; - this.timeUnit = unit; - restartIfNeeded(); + if (unit == null) throw new NullPointerException("unit is required"); + if (this.period != period || this.timeUnit != unit) { + this.period = period; + this.timeUnit = unit; + restartIfNeeded(); + } return this; } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java index a80137da92..0f0417f01b 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java @@ -201,6 +201,52 @@ public class ActiveMQScheduledComponentTest { } } + + @Test + public void testUpdatePeriod() throws Throwable { + final ReusableLatch latch = new ReusableLatch(1); + + final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(10, TimeUnit.MILLISECONDS, true) { + @Override + public void run() { + latch.countDown(); + } + }; + + local.start(); + + try { + Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS)); + + latch.setCount(1); + local.delay(); + Assert.assertTrue(latch.await(20, TimeUnit.MILLISECONDS)); + + latch.setCount(1); + Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS)); + + local.setPeriod(TimeUnit.HOURS.toMillis(1), TimeUnit.MILLISECONDS); + + latch.setCount(1); + local.delay(); + Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS)); + + local.setPeriod(1); + local.delay(); + Assert.assertTrue(latch.await(20, TimeUnit.MILLISECONDS)); + + local.setPeriod(1, TimeUnit.SECONDS); + + latch.setCount(1); + local.delay(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } finally { + local.stop(); + local.stop(); // calling stop again should not be an issue. + } + } + @Test public void testUsingCustomInitialDelay() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 48fb2b6b2a..0c84a13336 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -698,6 +698,17 @@ public final class ActiveMQDefaultConfiguration { private static final boolean DEFAULT_MANAGEMENT_MESSAGE_RBAC = false; + + // These properties used to defined with this prefix. + // I'm keeping the older property name in an attempt to guarantee compatibility + private static final String FORMER_ACK_RETRY_CLASS_NAME = "org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckRetry"; + private static final int DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MIN_QUEUE_ATTEMPTS", "5"));; + private static final int DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MAX_PAGE_ATTEMPT", "2"));; + + private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".RETRY_DELAY", "100"));; + + private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false; + /** * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers. */ @@ -1918,4 +1929,25 @@ public final class ActiveMQDefaultConfiguration { public static boolean getManagementMessagesRbac() { return DEFAULT_MANAGEMENT_MESSAGE_RBAC; } + + + /** This configures the Mirror Ack Manager number of attempts on queues before trying page acks. + * It is not intended to be configured through the XML. + * The default value here is 5. */ + public static int getMirrorAckManagerMinQueueAttempts() { + return DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS; + } + + public static int getMirrorAckManagerMaxPageAttempts() { + return DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS; + } + + public static int getMirrorAckManagerRetryDelay() { + return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY; + } + + public static boolean getDefaultMirrorPageTransaction() { + return DEFAULT_MIRROR_PAGE_TRANSACTION; + } + } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java index 4cbd17383a..f0d70340dd 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java @@ -37,6 +37,11 @@ import org.apache.activemq.artemis.core.persistence.Persister; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * K = Key + * V = Value + * C = Context + * */ public class JournalHashMap implements Map { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java index 21c5d2f444..378c64d098 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java @@ -87,7 +87,8 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory @Override public void loadProtocolServices(ActiveMQServer server, List services) { try { - AckManager ackManager = AckManagerProvider.getManager(server, false); + AckManager ackManager = AckManagerProvider.getManager(server); + services.add(ackManager); server.registerRecordsLoader(ackManager::reload); } catch (Exception e) { logger.warn(e.getMessage(), e); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 1b22567ad4..abee8a6220 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -419,6 +419,8 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im } catch (Throwable e) { logger.warn(e.getMessage(), e); } + + snfQueue.deliverAsync(); } private void syncDone(MessageReference reference) { @@ -516,6 +518,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im postACKInternalMessage(ref); return; } + snfQueue.deliverAsync(); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index d55d6ed71a..4028f77262 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; @@ -161,6 +162,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement final ActiveMQServer server; + final Configuration configuration; + DuplicateIDCache lruduplicateIDCache; String lruDuplicateIDKey; @@ -183,6 +186,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement this.basicController = new BasicMirrorController(server); this.basicController.setLink(receiver); this.server = server; + this.configuration = server.getConfiguration(); this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier(); mirrorContext = protonSession.getSessionSPI().getSessionContext(); } @@ -389,8 +393,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement } if (logger.isTraceEnabled()) { - logger.trace("Server {} with queue = {} being acked for {} coming from {} targetQueue = {}", - server.getIdentity(), queue, messageID, messageID, targetQueue); + logger.trace("Server {} with queue = {} being acked for {} from {} targetQueue = {} reason = {}", + server.getIdentity(), queue, messageID, ackMessage, targetQueue, reason); } performAck(nodeID, targetQueue, messageID, ackMessage, reason); @@ -407,7 +411,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement } if (ackManager == null) { - ackManager = AckManagerProvider.getManager(server, true); + ackManager = AckManagerProvider.getManager(server); } ackManager.ack(nodeID, targetQueue, messageID, reason, true); @@ -473,7 +477,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement message.setAddress(internalAddress); } - final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager()).setAsync(true); + final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager()).setAllowPageTransaction(configuration.isMirrorPageTransaction()).setAsync(true); transaction.addOperation(messageCompletionAck.tx); routingContext.setTransaction(transaction); duplicateIDCache.addToCache(duplicateIDBytes, transaction); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 0b3b395d12..51f88edad8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -29,6 +29,7 @@ import io.netty.util.collection.LongObjectHashMap; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -58,12 +59,6 @@ import org.slf4j.LoggerFactory; public class AckManager implements ActiveMQComponent { // we first retry on the queue a few times - private static final short MIN_QUEUE_ATTEMPTS = Short.parseShort(System.getProperty(AckRetry.class.getName() + ".MIN_QUEUE_ATTEMPTS", "5")); - - private static final short MAX_PAGE_ATTEMPTS = Short.parseShort(System.getProperty(AckRetry.class.getName() + ".MAX_PAGE_ATTEMPT", "2")); - - public static final int RETRY_DELAY = Integer.parseInt(System.getProperty(AckRetry.class.getName() + ".RETRY_DELAY", "100")); - private static DisabledAckMirrorController disabledAckMirrorController = new DisabledAckMirrorController(); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -72,6 +67,7 @@ public class AckManager implements ActiveMQComponent { final LongSupplier sequenceGenerator; final JournalHashMapProvider journalHashMapProvider; final ActiveMQServer server; + final Configuration configuration; final ReferenceIDSupplier referenceIDSupplier; final IOCriticalErrorListener ioCriticalErrorListener; volatile MultiStepProgress progress; @@ -79,6 +75,7 @@ public class AckManager implements ActiveMQComponent { public AckManager(ActiveMQServer server) { this.server = server; + this.configuration = server.getConfiguration(); this.ioCriticalErrorListener = server.getIoCriticalErrorListener(); this.journal = server.getStorageManager().getMessageJournal(); this.sequenceGenerator = server.getStorageManager()::generateID; @@ -107,9 +104,11 @@ public class AckManager implements ActiveMQComponent { @Override public synchronized void start() { - logger.debug("Starting ACKManager on {} with period = {}", server, RETRY_DELAY); + if (logger.isDebugEnabled()) { + logger.debug("Starting ACKManager on {} with period = {}, minQueueAttempts={}, maxPageAttempts={}", server, configuration.getMirrorAckManagerRetryDelay(), configuration.getMirrorAckManagerQueueAttempts(), configuration.getMirrorAckManagerPageAttempts()); + } if (!isStarted()) { - scheduledComponent = new ActiveMQScheduledComponent(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), RETRY_DELAY, RETRY_DELAY, TimeUnit.MILLISECONDS, true) { + scheduledComponent = new ActiveMQScheduledComponent(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), server.getConfiguration().getMirrorAckManagerRetryDelay(), server.getConfiguration().getMirrorAckManagerRetryDelay(), TimeUnit.MILLISECONDS, true) { @Override public void run() { beginRetry(); @@ -203,20 +202,20 @@ public class AckManager implements ActiveMQComponent { // to be used with the same executor as the PagingStore executor - public boolean retryAddress(SimpleString address, LongObjectHashMap> queuesToRetry) { + public boolean retryAddress(SimpleString address, LongObjectHashMap> acksToRetry) { MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse(); boolean retriedPaging = false; - logger.debug("retrying address {} on server {}", address, server); + logger.trace("retrying address {} on server {}", address, server); try { AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController); - if (checkRetriesAndPaging(queuesToRetry)) { - logger.debug("scanning paging for {}", address); + if (checkRetriesAndPaging(acksToRetry)) { + logger.trace("scanning paging for {}", address); AckRetry key = new AckRetry(); PagingStore store = server.getPagingManager().getPageStore(address); for (long pageId = store.getFirstPage(); pageId <= store.getCurrentWritingPage(); pageId++) { - if (isEmpty(queuesToRetry)) { + if (isEmpty(acksToRetry)) { logger.trace("Retry stopped while reading page {} on address {} as the outcome is now empty, server={}", pageId, address, server); break; } @@ -225,16 +224,16 @@ public class AckManager implements ActiveMQComponent { continue; } try { - if (retryPage(queuesToRetry, page, key)) { + if (retryPage(acksToRetry, page, key)) { retriedPaging = true; } } finally { page.usageDown(); } } - validateExpiredSet(queuesToRetry); + validateExpiredSet(acksToRetry); } else { - logger.debug("Page Scan not required for address {}", address); + logger.trace("Page Scan not required for address {}", address); } } catch (Throwable e) { @@ -251,15 +250,15 @@ public class AckManager implements ActiveMQComponent { private void validateExpireSet(long queueID, JournalHashMap retries) { for (AckRetry retry : retries.valuesCopy()) { - if (retry.getQueueAttempts() >= MIN_QUEUE_ATTEMPTS) { - if (retry.attemptedPage() >= MAX_PAGE_ATTEMPTS) { + if (retry.getQueueAttempts() >= configuration.getMirrorAckManagerQueueAttempts()) { + if (retry.attemptedPage() >= configuration.getMirrorAckManagerPageAttempts()) { if (logger.isDebugEnabled()) { logger.debug("Retried {} {} times, giving up on the entry now", retry, retry.getPageAttempts()); } retries.remove(retry); } else { if (logger.isDebugEnabled()) { - logger.debug("Retry {} attempted {} times on paging", retry, retry.getPageAttempts()); + logger.trace("Retry {} attempted {} times on paging", retry, retry.getPageAttempts()); } } } @@ -283,14 +282,14 @@ public class AckManager implements ActiveMQComponent { } long id = referenceIDSupplier.getID(pagedMessage.getMessage()); - logger.debug("Looking for retry on serverID={}, id={} on server={}", serverID, id, server); + logger.trace("Looking for retry on serverID={}, id={} on server={}", serverID, id, server); key.setNodeID(serverID).setMessageID(id); - AckRetry foundRetry = retries.get(key); + AckRetry ackRetry = retries.get(key); // we first retry messages in the queue first. // this is to avoid messages that are in transit from being depaged into the queue - if (foundRetry != null && foundRetry.getQueueAttempts() > MIN_QUEUE_ATTEMPTS) { + if (ackRetry != null && ackRetry.getQueueAttempts() > configuration.getMirrorAckManagerQueueAttempts()) { Queue queue = retries.getContext(); if (queue != null) { @@ -307,9 +306,9 @@ public class AckManager implements ActiveMQComponent { } } } - retries.remove(foundRetry, transaction.getID()); + retries.remove(ackRetry, transaction.getID()); transaction.setContainsPersistent(); - logger.debug("retry found = {} for message={} on queue", foundRetry, pagedMessage); + logger.trace("retry performed ok, ackRetry={} for message={} on queue", ackRetry, pagedMessage); } } } else { @@ -341,14 +340,14 @@ public class AckManager implements ActiveMQComponent { Queue queue = queueRetries.getContext(); for (AckRetry retry : queueRetries.valuesCopy()) { if (ack(retry.getNodeID(), queue, retry.getMessageID(), retry.getReason(), false)) { - logger.debug("Removing retry {} as the retry went ok", retry); + logger.trace("Removing retry {} as the retry went ok", retry); queueRetries.remove(retry); } else { int retried = retry.attemptedQueue(); - if (logger.isDebugEnabled()) { - logger.debug("retry {} attempted {} times on the queue", retry, retried); + if (logger.isTraceEnabled()) { + logger.trace("retry {} attempted {} times on the queue", retry, retried); } - if (retried >= MIN_QUEUE_ATTEMPTS) { + if (retried >= configuration.getMirrorAckManagerQueueAttempts()) { needScanOnPaging = true; } } @@ -365,6 +364,8 @@ public class AckManager implements ActiveMQComponent { AckRetry retry = new AckRetry(nodeID, messageID, reason); journalHashMapProvider.getMap(queue.getID(), queue).put(retry, retry); if (scheduledComponent != null) { + // we set the retry delay again in case it was changed. + scheduledComponent.setPeriod(configuration.getMirrorAckManagerRetryDelay()); scheduledComponent.delay(); } } @@ -377,20 +378,29 @@ public class AckManager implements ActiveMQComponent { MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceIDSupplier); if (reference == null) { - logger.debug("Could not find retry on reference nodeID={} (while localID={}), messageID={} on queue {}, server={}", nodeID, referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(), server); + if (logger.isDebugEnabled()) { + logger.debug("ACK Manager could not find reference nodeID={} (while localID={}), messageID={} on queue {}, server={}. Adding retry with minQueue={}, maxPage={}, delay={}", nodeID, referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(), server, configuration.getMirrorAckManagerQueueAttempts(), configuration.getMirrorAckManagerPageAttempts(), configuration.getMirrorAckManagerRetryDelay()); + printQueueDebug(targetQueue); + } if (allowRetry) { addRetry(nodeID, targetQueue, messageID, reason); } return false; } else { if (logger.isTraceEnabled()) { - logger.trace("ack {} worked well for messageID={} nodeID={} queue={}, targetQueue={}", server, messageID, nodeID, reference.getQueue(), targetQueue); + logger.trace("ack worked well for messageID={} nodeID={} queue={}, reference={}", messageID, nodeID, reference.getQueue().getName(), reference); + if (reference.isPaged()) { + logger.trace("position for messageID={} = {}", messageID, ((PagedReference)reference).getPosition()); + } } doACK(targetQueue, reference, reason); return true; } } + private void printQueueDebug(Queue targetQueue) { + logger.debug("... queue {}/{} had {} consumers, {} messages, {} scheduled messages, {} delivering messages, paging={}", targetQueue.getID(), targetQueue.getName(), targetQueue.getConsumerCount(), targetQueue.getMessageCount(), targetQueue.getScheduledCount(), targetQueue.getDeliveringCount(), targetQueue.getPagingStore().isPaging()); + } private void doACK(Queue targetQueue, MessageReference reference, AckReason reason) { try { @@ -399,9 +409,12 @@ public class AckManager implements ActiveMQComponent { targetQueue.expire(reference, null, false); break; default: - TransactionImpl transaction = new TransactionImpl(server.getStorageManager()).setAsync(true); + TransactionImpl transaction = new TransactionImpl(server.getStorageManager()); targetQueue.acknowledge(transaction, reference, reason, null, false); transaction.commit(); + if (logger.isTraceEnabled()) { + logger.trace("Transaction {} committed on acking reference {}", transaction.getID(), reference); + } break; } } catch (Exception e) { @@ -428,11 +441,6 @@ public class AckManager implements ActiveMQComponent { public void nextStep() { try { if (!retryIterator.hasNext()) { - if (retriedPaging) { - logger.debug("Retried acks on paging, better to rebuild the page counters"); - server.getPagingManager().rebuildCounters(null); - } - logger.trace("Iterator is done on retry, server={}", server); AckManager.this.endRetry(); } else { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java index 15f6075358..e23f0d69bc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java @@ -47,13 +47,10 @@ public class AckManagerProvider { } } - public static AckManager getManager(ActiveMQServer server, boolean start) { + public static AckManager getManager(ActiveMQServer server) { synchronized (managerHashMap) { AckManager ackManager = managerHashMap.get(server); if (ackManager != null) { - if (start && !ackManager.isStarted()) { - ackManager.start(); - } return ackManager; } @@ -64,9 +61,6 @@ public class AckManagerProvider { } catch (Exception e) { logger.warn(e.getMessage(), e); } - if (start) { - ackManager.start(); - } return ackManager; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java index 79508f3219..c1735e6d04 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java @@ -30,6 +30,8 @@ public class MirrorTransaction extends TransactionImpl { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + boolean allowPageTransaction; + MirrorController controlInUse; public MirrorTransaction(StorageManager storageManager) { @@ -49,4 +51,13 @@ public class MirrorTransaction extends TransactionImpl { } } + @Override + public boolean isAllowPageTransaction() { + return allowPageTransaction; + } + + public MirrorTransaction setAllowPageTransaction(boolean allowPageTransaction) { + this.allowPageTransaction = allowPageTransaction; + return this; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index edb56c3bd7..1df52bd0bc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -1509,4 +1509,30 @@ public interface Configuration { void setManagementRbacPrefix(String prefix); + /** This configures the Mirror Ack Manager number of attempts on queues before trying page acks. + * The default value here is 5. */ + int getMirrorAckManagerQueueAttempts(); + + Configuration setMirrorAckManagerQueueAttempts(int queueAttempts); + + /** This configures the Mirror Ack Manager number of attempts on page acks. + * The default value here is 2. */ + int getMirrorAckManagerPageAttempts(); + + Configuration setMirrorAckManagerPageAttempts(int pageAttempts); + + /** This configures the interval in which the Mirror AckManager will retry acks when + * It is not intended to be configured through the XML. + * The default value here is 100, and this is in milliseconds. */ + int getMirrorAckManagerRetryDelay(); + + Configuration setMirrorAckManagerRetryDelay(int delay); + + /** Should Mirror use Page Transactions When target destinations is paging? + * When a target queue on the mirror is paged, the mirror will not record a page transaction for every message. + * The default is false, and the overhead of paged messages will be smaller, but there is a possibility of eventual duplicates in case of interrupted communication between the mirror source and target. + * If you set this to true there will be a record stored on the journal for the page-transaction additionally to the record in the page store. */ + boolean isMirrorPageTransaction(); + + Configuration setMirrorPageTransaction(boolean ignorePageTransactions); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 785cecce78..b99e7a53ac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -438,6 +438,15 @@ public class ConfigurationImpl implements Configuration, Serializable { private boolean managementMessagesRbac = ActiveMQDefaultConfiguration.getManagementMessagesRbac(); + private int mirrorAckManagerMinQueueAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMinQueueAttempts(); + + private int mirrorAckManagerMaxPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMaxPageAttempts(); + + private int mirrorAckManagerRetryDelay = ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay(); + + private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getDefaultMirrorPageTransaction(); + + /** * Parent folder for all data folders. */ @@ -3353,6 +3362,55 @@ public class ConfigurationImpl implements Configuration, Serializable { this.managementRbacPrefix = prefix; } + + @Override + public int getMirrorAckManagerQueueAttempts() { + return mirrorAckManagerMinQueueAttempts; + } + + @Override + public ConfigurationImpl setMirrorAckManagerQueueAttempts(int minQueueAttempts) { + logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}", minQueueAttempts); + this.mirrorAckManagerMinQueueAttempts = minQueueAttempts; + return this; + } + + @Override + public int getMirrorAckManagerPageAttempts() { + return this.mirrorAckManagerMaxPageAttempts; + } + + @Override + public ConfigurationImpl setMirrorAckManagerPageAttempts(int maxPageAttempts) { + logger.debug("Setting mirrorAckManagerMaxPageAttempts = {}", maxPageAttempts); + this.mirrorAckManagerMaxPageAttempts = maxPageAttempts; + return this; + } + + @Override + public int getMirrorAckManagerRetryDelay() { + return mirrorAckManagerRetryDelay; + } + + @Override + public ConfigurationImpl setMirrorAckManagerRetryDelay(int delay) { + logger.debug("Setting mirrorAckManagerRetryDelay = {}", delay); + this.mirrorAckManagerRetryDelay = delay; + return this; + } + + @Override + public boolean isMirrorPageTransaction() { + return mirrorPageTransaction; + } + + @Override + public Configuration setMirrorPageTransaction(boolean ignorePageTransactions) { + logger.debug("Setting mirrorIgnorePageTransactions={}", ignorePageTransactions); + this.mirrorPageTransaction = ignorePageTransactions; + return this; + } + // extend property utils with ability to auto-fill and locate from collections // collection entries are identified by the name() property private static class CollectionAutoFillPropertiesUtil extends PropertyUtilsBean { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 6b885e75d1..99012fd540 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -380,6 +380,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String ID_CACHE_SIZE = "id-cache-size"; + private static final String MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS = "mirror-ack-manager-queue-attempts"; + + private static final String MIRROR_ACK_MANAGER_PAGE_ATTEMPTS = "mirror-ack-manager-page-attempts"; + + private static final String MIRROR_ACK_MANAGER_RETRY_DELAY = "mirror-ack-manager-retry-delay"; + + private static final String MIRROR_PAGE_TRANSACTION = "mirror-page-transaction"; + private boolean validateAIO = false; private boolean printPageMaxSizeUsed = false; @@ -849,6 +857,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setManagementRbacPrefix(getString(e, "management-rbac-prefix", config.getManagementRbacPrefix(), NO_CHECK)); + config.setMirrorPageTransaction(getBoolean(e, MIRROR_PAGE_TRANSACTION, config.isMirrorPageTransaction())); + + config.setMirrorAckManagerPageAttempts(getInteger(e, MIRROR_ACK_MANAGER_PAGE_ATTEMPTS, config.getMirrorAckManagerPageAttempts(), GT_ZERO)); + + config.setMirrorAckManagerQueueAttempts(getInteger(e, MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS, config.getMirrorAckManagerQueueAttempts(), GT_ZERO)); + + config.setMirrorAckManagerRetryDelay(getInteger(e, MIRROR_ACK_MANAGER_RETRY_DELAY, config.getMirrorAckManagerRetryDelay(), GT_ZERO)); + parseAddressSettings(e, config); parseResourceLimits(e, config); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index dcf0e01412..3b60ef2f06 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -1253,8 +1253,7 @@ public class PagingStoreImpl implements PagingStore { return false; } - // not using page transaction if transaction is declared async - final long transactionID = (tx == null || tx.isAsync()) ? -1 : tx.getID(); + final long transactionID = (tx != null && tx.isAllowPageTransaction()) ? tx.getID() : -1L; if (pageDecorator != null) { message = pageDecorator.apply(message); @@ -1273,7 +1272,7 @@ public class PagingStoreImpl implements PagingStore { currentPageSize += bytesToWrite; } - if (tx != null && !tx.isAsync()) { + if (tx != null && tx.isAllowPageTransaction()) { installPageTransaction(tx, listCtx); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java index 11bb71b6b1..2069769a95 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java @@ -30,8 +30,8 @@ public final class AckRetry { byte[] temporaryNodeBytes; long messageID; AckReason reason; - short pageAttempts; - short queueAttempts; + int pageAttempts; + int queueAttempts; private static Persister persister = new Persister(); @@ -41,7 +41,7 @@ public final class AckRetry { @Override public String toString() { - return "ACKRetry{" + "nodeID='" + nodeID + '\'' + ", messageID=" + messageID + ", reason=" + reason + '}'; + return "AckRetry{" + "nodeID='" + nodeID + '\'' + ", messageID=" + messageID + ", reason=" + reason + ", pageAttempts=" + pageAttempts + ", queueAttempts=" + queueAttempts + '}'; } public AckRetry() { @@ -92,19 +92,19 @@ public final class AckRetry { return this; } - public short getPageAttempts() { + public int getPageAttempts() { return pageAttempts; } - public short getQueueAttempts() { + public int getQueueAttempts() { return queueAttempts; } - public short attemptedPage() { + public int attemptedPage() { return ++pageAttempts; } - public short attemptedQueue() { + public int attemptedQueue() { return ++queueAttempts; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java index befaaf3405..0a64aa1e90 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java @@ -109,4 +109,8 @@ public interface Transaction { /** To be used on control transactions that are meant as internal and don't really require a hard sync. */ Transaction setAsync(boolean async); + + default boolean isAllowPageTransaction() { + return true; + } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index d3779266bd..b0c8116257 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -915,6 +915,48 @@ + + + + The number of times a mirror target would retry an acknowledgement on the queue before scanning page files for the message. + + This is exposed as mirrorAckManagerQueueAttempts on broker properties. + + + + + + + + The number of times a mirror target would retry an acknowledgement on paging. + + This is exposed as mirrorAckManagerPageAttempts on broker properties. + + + + + + + + Period in milliseconds for which retries are going to be exercised. + This is exposed as mirrorAckManagerRetryDelay on broker properties. + + + + + + + + Should Mirror use Page Transactions When target destinations is paging? + When a target queue on the mirror is paged, the mirror will not record a page transaction for every message. + The default is false, and the overhead of paged messages will be smaller, but there is a possibility of eventual duplicates in case of interrupted communication between the mirror source and target. + If you set this to true there will be a record stored on the journal for the page-transaction additionally to the record in the page store. + + This is exposed as mirrorPageTransactions on broker properties. + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 2b1da86de2..84a3273ad9 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -507,6 +507,11 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertFalse(conf.getAddressSettings().get("a2").isEnableIngressTimestamp()); assertEquals(Integer.valueOf(500), conf.getAddressSettings().get("a2").getIDCacheSize()); + Assert.assertEquals(111, conf.getMirrorAckManagerQueueAttempts()); + Assert.assertEquals(222, conf.getMirrorAckManagerPageAttempts()); + Assert.assertEquals(333, conf.getMirrorAckManagerRetryDelay()); + Assert.assertTrue(conf.isMirrorPageTransaction()); + assertTrue(conf.getResourceLimitSettings().containsKey("myUser")); assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections()); assertEquals(13, conf.getResourceLimitSettings().get("myUser").getMaxQueues()); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 137ddfa4c6..01adc7fe8f 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -541,6 +541,12 @@ ping-four ping-six 1000 + + 111 + 222 + 333 + true + diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index 6193b1086c..f22ba09f28 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -70,6 +70,12 @@ 777 false () + + 111 + 222 + 333 + true + org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1 org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor2 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml index 1adab32879..1c163b4a14 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml @@ -70,6 +70,13 @@ 777 false () + + 111 + 222 + 333 + true + + true diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java index 8792f2e7f9..077764b4e4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java @@ -136,7 +136,7 @@ public class AckManagerTest extends ActiveMQTestBase { ReferenceIDSupplier referenceIDSupplier = new ReferenceIDSupplier(server1); { - AckManager ackManager = AckManagerProvider.getManager(server1, false); + AckManager ackManager = AckManagerProvider.getManager(server1); AtomicInteger counter = new AtomicInteger(0); @@ -161,7 +161,8 @@ public class AckManagerTest extends ActiveMQTestBase { // in this following loop we will get the ackManager, compare the stored retries. stop the server and validate if they were reloaded correctly for (int repeat = 0; repeat < 2; repeat++) { logger.info("Repeating {}", repeat); - AckManager ackManager = AckManagerProvider.getManager(server1, true); + AckManager ackManager = AckManagerProvider.getManager(server1); + ackManager.start(); HashMap>> sortedRetries = ackManager.sortRetries(); @@ -179,19 +180,22 @@ public class AckManagerTest extends ActiveMQTestBase { Wait.assertEquals(numberOfMessages, c1s1::getMessageCount); Wait.assertEquals(numberOfMessages, c2s2::getMessageCount); - AckManager originalManager = AckManagerProvider.getManager(server1, false); + AckManager originalManager = AckManagerProvider.getManager(server1); server1.stop(); Assert.assertEquals(0, AckManagerProvider.getSize()); server1.start(); - AckManager newManager = AckManagerProvider.getManager(server1, false); + AckManager newManager = AckManagerProvider.getManager(server1); Assert.assertEquals(1, AckManagerProvider.getSize()); - Assert.assertNotSame(originalManager, AckManagerProvider.getManager(server1, true)); + Assert.assertNotSame(originalManager, AckManagerProvider.getManager(server1)); + AckManager manager = AckManagerProvider.getManager(server1); + Wait.assertTrue(manager::isStarted, 5_000); Assert.assertEquals(1, AckManagerProvider.getSize()); Assert.assertNotSame(newManager, ackManager); } - AckManager ackManager = AckManagerProvider.getManager(server1, true); + AckManager ackManager = AckManagerProvider.getManager(server1); + ackManager.start(); HashMap>> sortedRetries = ackManager.sortRetries(); Assert.assertEquals(1, sortedRetries.size()); LongObjectHashMap> acksOnAddress = sortedRetries.get(c1s1.getAddress()); diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java new file mode 100644 index 0000000000..c93d419ff9 --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.io.File; +import java.io.StringWriter; +import java.lang.invoke.MethodHandles; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.artemis.api.core.management.SimpleManagement; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.tests.soak.SoakTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.util.ServerUtil; +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.activemq.artemis.utils.TestParameters; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.actors.OrderedExecutor; +import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SingleMirrorSoakTest extends SoakTestBase { + + private static final String TEST_NAME = "SINGLE_MIRROR_SOAK"; + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + // Set this to true and log4j will be configured with some relevant log.trace for the AckManager at the server's + private static final boolean TRACE_LOGS = Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", "false")); + private static final int NUMBER_MESSAGES = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_500); + private static final int RECEIVE_COMMIT = TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT", 100); + private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100); + private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500); + private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000); + + private static final String TOPIC_NAME = "topicTest"; + + private static String body; + + static { + StringWriter writer = new StringWriter(); + while (writer.getBuffer().length() < 30 * 1024) { + writer.append("The sky is blue, ..... watch out for poop from the birds though!..."); + } + body = writer.toString(); + } + + public static final String DC1_NODE = "SingleMirrorSoakTest/DC1"; + public static final String DC2_NODE = "SingleMirrorSoakTest/DC2"; + + volatile Process processDC1; + volatile Process processDC2; + + @After + public void destroyServers() { + if (processDC1 != null) { + processDC1.destroyForcibly(); + } + if (processDC2 != null) { + processDC2.destroyForcibly(); + } + + } + + private static final String DC1_URI = "tcp://localhost:61616"; + private static final String DC2_URI = "tcp://localhost:61618"; + + private static void createServer(String serverName, + String connectionName, + String mirrorURI, + int porOffset, + boolean paging) throws Exception { + File serverLocation = getFileServerLocation(serverName); + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = new HelperCreate(); + cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation); + cliCreateServer.setMessageLoadBalancing("ON_DEMAND"); + cliCreateServer.setClustered(false); + cliCreateServer.setNoWeb(false); + cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE); + cliCreateServer.addArgs("--addresses", TOPIC_NAME); + cliCreateServer.setPortOffset(porOffset); + cliCreateServer.createServer(); + + Properties brokerProperties = new Properties(); + brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI); + brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000"); + brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString()); + brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false"); + brokerProperties.put("largeMessageSync", "false"); + brokerProperties.put("mirrorAckManagerPageAttempts", "10"); + brokerProperties.put("mirrorAckManagerRetryDelay", "1000"); + // if we don't use pageTransactions we may eventually get a few duplicates + brokerProperties.put("mirrorPageTransaction", "true"); + File brokerPropertiesFile = new File(serverLocation, "broker.properties"); + saveProperties(brokerProperties, brokerPropertiesFile); + + File brokerXml = new File(serverLocation, "/etc/broker.xml"); + Assert.assertTrue(brokerXml.exists()); + // Adding redistribution delay to broker configuration + Assert.assertTrue(FileUtil.findReplace(brokerXml, "", "\n\n" + " 0 \n")); + if (paging) { + Assert.assertTrue(FileUtil.findReplace(brokerXml, "-1", "1")); + Assert.assertTrue(FileUtil.findReplace(brokerXml, "20M", "-1")); + Assert.assertTrue(FileUtil.findReplace(brokerXml, "-1", "100000\n" + " 10000")); + } + + if (TRACE_LOGS) { + File log4j = new File(serverLocation, "/etc/log4j2.properties"); + Assert.assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + + "\n" + "logger.ack.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n" + + "logger.ack.level=TRACE\n" + + "logger.config.name=org.apache.activemq.artemis.core.config.impl.ConfigurationImpl\n" + + "logger.config.level=TRACE\n" + + "appender.console.filter.threshold.type = ThresholdFilter\n" + + "appender.console.filter.threshold.level = info")); + } + + } + + public static void createRealServers(boolean paging) throws Exception { + createServer(DC1_NODE, "mirror", DC2_URI, 0, paging); + createServer(DC2_NODE, "mirror", DC1_URI, 2, paging); + } + + private void startServers() throws Exception { + processDC1 = startServer(DC1_NODE, -1, -1, new File(getServerLocation(DC1_NODE), "broker.properties")); + processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties")); + + ServerUtil.waitForServerToStart(0, 10_000); + ServerUtil.waitForServerToStart(2, 10_000); + } + + @Test + public void testInterruptedMirrorTransfer() throws Exception { + createRealServers(true); + startServers(); + + + Assert.assertTrue(KILL_INTERNAL > SEND_COMMIT); + + String clientIDA = "nodeA"; + String clientIDB = "nodeB"; + String subscriptionID = "my-order"; + String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror"; + + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_URI); + + consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT); + consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT); + + SimpleManagement managementDC1 = new SimpleManagement(DC1_URI, null, null); + SimpleManagement managementDC2 = new SimpleManagement(DC2_URI, null, null); + + runAfter(() -> managementDC1.close()); + runAfter(() -> managementDC2.close()); + + Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID)); + Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID)); + Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID)); + Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID)); + + ExecutorService executorService = Executors.newFixedThreadPool(3); + runAfter(executorService::shutdownNow); + executorService.execute(() -> { + try { + consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, NUMBER_MESSAGES, true, false, RECEIVE_COMMIT); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + }); + executorService.execute(() -> { + try { + consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, NUMBER_MESSAGES, true, false, RECEIVE_COMMIT); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + }); + + OrderedExecutor restartExeuctor = new OrderedExecutor(executorService); + AtomicBoolean running = new AtomicBoolean(true); + runAfter(() -> running.set(false)); + + try (Connection connection = connectionFactoryDC1A.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createTopic(TOPIC_NAME)); + for (int i = 0; i < NUMBER_MESSAGES; i++) { + TextMessage message = session.createTextMessage(body); + message.setIntProperty("i", i); + message.setBooleanProperty("large", false); + producer.send(message); + if (i > 0 && i % SEND_COMMIT == 0) { + logger.info("Sent {} messages", i); + session.commit(); + } + if (i > 0 && i % KILL_INTERNAL == 0) { + restartExeuctor.execute(() -> { + if (running.get()) { + try { + logger.info("Restarting target server (DC2)"); + if (processDC2 != null) { + processDC2.destroyForcibly(); + processDC2.waitFor(1, TimeUnit.MINUTES); + processDC2 = null; + } + processDC2 = startServer(DC2_NODE, 2, 10_000, new File(getServerLocation(DC2_NODE), "broker.properties")); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + }); + } + } + session.commit(); + running.set(false); + } + + Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT); + Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT); + Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), 10_000); + Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), 10_000); + Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), 10_000); + Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), 10_000); + } + + private static void consume(ConnectionFactory factory, + String clientID, + String subscriptionID, + int start, + int numberOfMessages, + boolean expectEmpty, + boolean assertBody, + int batchCommit) throws Exception { + try (Connection connection = factory.createConnection()) { + connection.setClientID(clientID); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + connection.start(); + MessageConsumer consumer = session.createDurableConsumer(topic, subscriptionID); + boolean failed = false; + + int pendingCommit = 0; + + for (int i = start; i < start + numberOfMessages; i++) { + TextMessage message = (TextMessage) consumer.receive(10_000); + Assert.assertNotNull(message); + logger.debug("Received message {}, large={}", message.getIntProperty("i"), message.getBooleanProperty("large")); + if (message.getIntProperty("i") != i) { + failed = true; + logger.warn("Expected message {} but got {}", i, message.getIntProperty("i")); + } + logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large")); + pendingCommit++; + if (pendingCommit >= batchCommit) { + logger.info("received {}", i); + session.commit(); + pendingCommit = 0; + } + } + session.commit(); + + Assert.assertFalse(failed); + + if (expectEmpty) { + Assert.assertNull(consumer.receiveNoWait()); + } + } + } + + public long getCount(SimpleManagement simpleManagement, String queue) throws Exception { + try { + long value = simpleManagement.getMessageCountOnQueue(queue); + logger.debug("count on queue {} is {}", queue, value); + return value; + } catch (Exception e) { + logger.warn(e.getMessage(), e); + return -1; + } + } +} diff --git a/tests/soak-tests/src/test/scripts/parameters.sh b/tests/soak-tests/src/test/scripts/parameters.sh index 62908b885b..ddbe3aa2e6 100755 --- a/tests/soak-tests/src/test/scripts/parameters.sh +++ b/tests/soak-tests/src/test/scripts/parameters.sh @@ -141,4 +141,11 @@ export TEST_CLIENT_FAILURE_OPENWIRE_MEMORY_CLIENT=-Xmx256m export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_TEST_ENABLED=true export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_SERVERS=3 export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200 -export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10 \ No newline at end of file +export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10 + +export TEST_SINGLE_MIRROR_SOAK_TRACE_LOGS=false +export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=2500 +export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=100 +export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=100 +export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=500 +export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000 \ No newline at end of file