diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index b8500f153e..d8bf42940d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -710,8 +710,8 @@ public final class ActiveMQDefaultConfiguration { // These properties used to defined with this prefix. // I'm keeping the older property name in an attempt to guarantee compatibility private static final String FORMER_ACK_RETRY_CLASS_NAME = "org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckRetry"; - private static final int DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MIN_QUEUE_ATTEMPTS", "5"));; - private static final int DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MAX_PAGE_ATTEMPT", "2"));; + private static final int DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MIN_QUEUE_ATTEMPTS", "5"));; + private static final int DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MAX_PAGE_ATTEMPT", "2"));; private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".RETRY_DELAY", "100"));; @@ -1953,19 +1953,19 @@ public final class ActiveMQDefaultConfiguration { /** This configures the Mirror Ack Manager number of attempts on queues before trying page acks. * It is not intended to be configured through the XML. * The default value here is 5. */ - public static int getMirrorAckManagerMinQueueAttempts() { - return DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS; + public static int getMirrorAckManagerQueueAttempts() { + return DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS; } - public static int getMirrorAckManagerMaxPageAttempts() { - return DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS; + public static int getMirrorAckManagerPageAttempts() { + return DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS; } public static int getMirrorAckManagerRetryDelay() { return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY; } - public static boolean getDefaultMirrorPageTransaction() { + public static boolean getMirrorPageTransaction() { return DEFAULT_MIRROR_PAGE_TRANSACTION; } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/OperationConsistencyLevel.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/OperationConsistencyLevel.java new file mode 100644 index 0000000000..4a1f337725 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/OperationConsistencyLevel.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io; + +public enum OperationConsistencyLevel { + FULL, STORAGE, IGNORE_REPLICATION +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java index f1339588c7..bd4f1ab526 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java @@ -232,6 +232,9 @@ public class JournalHashMap implements Map { // callers must be synchronized private void removed(MapRecord record) { + if (logger.isTraceEnabled()) { + logger.trace("Removing record {}", record); + } try { journal.deleteMapRecord(record.id, false); } catch (Exception e) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index cf1c3ac641..b6d59ec4ce 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -702,8 +702,9 @@ public class AMQPSessionCallback implements SessionCallback { storageManager.setContext(oldContext); } + /** Set the proper operation context in the Thread Local. + * Return the old context*/ public OperationContext recoverContext() { - OperationContext oldContext = storageManager.getContext(); manager.getServer().getStorageManager().setContext(serverSession.getSessionContext()); return oldContext; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index edc73d62e0..3517e0cdbd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -261,10 +262,10 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement @Override protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) { - recoverContext(); + OperationContext oldContext = recoverContext(); incrementSettle(); - logger.trace("{}::actualdelivery call for {}", server, message); + logger.trace("{}::actualDelivery call for {}", server, message); setControllerInUse(this); delivery.setContext(message); @@ -337,8 +338,10 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement } finally { setControllerInUse(null); if (messageAckOperation != null) { - server.getStorageManager().afterCompleteOperations(messageAckOperation); + server.getStorageManager().afterCompleteOperations(messageAckOperation, OperationConsistencyLevel.FULL); } + + OperationContextImpl.setContext(oldContext); } } @@ -475,7 +478,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement ackManager.ack(nodeID, targetQueue, messageID, reason, true); - OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation); + OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, OperationConsistencyLevel.FULL); } /** @@ -536,8 +539,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement message.setAddress(internalAddress); } + // notice that MirrorTransaction is overriding getRequiredConsistency that is being set to ignore Replication. + // that means in case the target server is using replication, we will not wait for a roundtrip before the message is sent + // however we will wait the roundtrip before acking the message + // This is to alleviate a situation where messages would take too long to be delivered and be ready for ack final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager()).setAllowPageTransaction(configuration.isMirrorPageTransaction()).setAsync(true); - transaction.addOperation(messageCompletionAck.tx); routingContext.setTransaction(transaction); duplicateIDCache.addToCache(duplicateIDBytes, transaction); @@ -550,6 +556,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement } // We use this as part of a transaction because of the duplicate detection cache that needs to be done atomically transaction.commit(); + server.getStorageManager().afterCompleteOperations(messageCompletionAck, OperationConsistencyLevel.FULL); flow(); // return true here will instruct the caller to ignore any references to messageCompletionAck @@ -583,4 +590,4 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement // Do nothing } -} +} \ No newline at end of file diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 3668935a13..6328808f7d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.RejectedExecutionException; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -139,7 +140,14 @@ public class AckManager implements ActiveMQComponent { // schedule a retry if (!sortRetries().isEmpty()) { - scheduledComponent.delay(); + ActiveMQScheduledComponent scheduleComponentReference = scheduledComponent; + if (scheduleComponentReference != null) { + try { + scheduleComponentReference.delay(); + } catch (RejectedExecutionException ree) { + logger.debug("AckManager could not schedule a new retry due to the executor being shutdown {}", ree.getMessage(), ree); + } + } } } @@ -277,12 +285,12 @@ public class AckManager implements ActiveMQComponent { if (retry.getQueueAttempts() >= configuration.getMirrorAckManagerQueueAttempts()) { if (retry.attemptedPage() >= configuration.getMirrorAckManagerPageAttempts()) { if (logger.isDebugEnabled()) { - logger.debug("Retried {} {} times, giving up on the entry now", retry, retry.getPageAttempts()); + logger.debug("Retried {} {} times, giving up on the entry now. Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); } retries.remove(retry); } else { if (logger.isDebugEnabled()) { - logger.trace("Retry {} attempted {} times on paging", retry, retry.getPageAttempts()); + logger.debug("Retry {} attempted {} times on paging, Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java index c1735e6d04..114cf9ad6f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect.mirror; import java.util.List; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.mirror.MirrorController; import org.apache.activemq.artemis.core.transaction.TransactionOperation; @@ -26,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; +/** MirrorTransaction disable some syncs in storage, and plays with OperationConsistencyLevel to relax some of the syncs required for Mirroring. */ public class MirrorTransaction extends TransactionImpl { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -60,4 +62,9 @@ public class MirrorTransaction extends TransactionImpl { this.allowPageTransaction = allowPageTransaction; return this; } + + @Override + protected OperationConsistencyLevel getRequiredConsistency() { + return OperationConsistencyLevel.IGNORE_REPLICATION; + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java index 9dbfe5406d..188eb9eec0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import java.lang.invoke.MethodHandles; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; @@ -91,8 +92,10 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme return protonSession; } - protected void recoverContext() { - sessionSPI.recoverContext(); + /** Set the proper operation context in the Thread Local. + * Return the old context*/ + protected OperationContext recoverContext() { + return sessionSPI.recoverContext(); } protected void closeCurrentReader() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 2a532d16bb..4983527b6b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -440,13 +440,13 @@ public class ConfigurationImpl implements Configuration, Serializable { private boolean managementMessagesRbac = ActiveMQDefaultConfiguration.getManagementMessagesRbac(); - private int mirrorAckManagerMinQueueAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMinQueueAttempts(); + private int mirrorAckManagerQueueAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerQueueAttempts(); - private int mirrorAckManagerMaxPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMaxPageAttempts(); + private int mirrorAckManagerPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerPageAttempts(); private int mirrorAckManagerRetryDelay = ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay(); - private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getDefaultMirrorPageTransaction(); + private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getMirrorPageTransaction(); /** @@ -941,6 +941,9 @@ public class ConfigurationImpl implements Configuration, Serializable { // Identify the property name and value(s) to be assigned final String name = entry.getKey(); try { + if (logger.isDebugEnabled()) { + logger.debug("set property target={}, name = {}, value = {}", target.getClass(), name, entry.getValue()); + } // Perform the assignment for this property beanUtils.setProperty(target, name, entry.getValue()); } catch (InvocationTargetException invocationTargetException) { @@ -3379,25 +3382,25 @@ public class ConfigurationImpl implements Configuration, Serializable { @Override public int getMirrorAckManagerQueueAttempts() { - return mirrorAckManagerMinQueueAttempts; + return mirrorAckManagerQueueAttempts; } @Override public ConfigurationImpl setMirrorAckManagerQueueAttempts(int minQueueAttempts) { logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}", minQueueAttempts); - this.mirrorAckManagerMinQueueAttempts = minQueueAttempts; + this.mirrorAckManagerQueueAttempts = minQueueAttempts; return this; } @Override public int getMirrorAckManagerPageAttempts() { - return this.mirrorAckManagerMaxPageAttempts; + return this.mirrorAckManagerPageAttempts; } @Override public ConfigurationImpl setMirrorAckManagerPageAttempts(int maxPageAttempts) { logger.debug("Setting mirrorAckManagerMaxPageAttempts = {}", maxPageAttempts); - this.mirrorAckManagerMaxPageAttempts = maxPageAttempts; + this.mirrorAckManagerPageAttempts = maxPageAttempts; return this; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java index dd7276fdfc..6a7d0a9c4f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.persistence; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.journal.IOCompletion; /** @@ -29,13 +30,9 @@ public interface OperationContext extends IOCompletion { /** * Execute the task when all IO operations are complete, * Or execute it immediately if nothing is pending. - * - * @param runnable the tas to be executed. - * @param storeOnly There are tasks that won't need to wait on replication or paging and will need to - * be completed as soon as the response from the journal is received. An example would be the - * DuplicateCache + * Notice it's possible to pass a consistencyLevel to what should be waited before completing the operation. */ - void executeOnCompletion(IOCallback runnable, boolean storeOnly); + void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel consistencyLevel); /** * Execute the task when all IO operations are complete, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 6f0740a278..d2e1746780 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.Journal; @@ -135,6 +136,7 @@ public interface StorageManager extends MapStorageManager, IDGenerator, ActiveMQ void afterCompleteOperations(IOCallback run); + void afterCompleteOperations(IOCallback run, OperationConsistencyLevel consistencyLevel); /** * This is similar to afterComplete, however this only cares about the journal part. */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index a9cbcb5e77..4b552b3da7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; @@ -354,9 +355,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp getContext().executeOnCompletion(run); } + @Override + public void afterCompleteOperations(final IOCallback run, OperationConsistencyLevel consistencyLevel) { + getContext().executeOnCompletion(run, consistencyLevel); + } + @Override public void afterStoreOperations(IOCallback run) { - getContext().executeOnCompletion(run, true); + getContext().executeOnCompletion(run, OperationConsistencyLevel.STORAGE); } @Override @@ -2120,8 +2126,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } @Override - public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { - executeOnCompletion(runnable); + public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel consistencyLevel) { + // There are no executeOnCompletion calls while using the DummyOperationContext + // However we keep the code here for correctness + runnable.done(); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java index f4883c8cc5..d5ed6123d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.persistence.OperationContext; public final class DummyOperationContext implements OperationContext { @@ -35,7 +36,7 @@ public final class DummyOperationContext implements OperationContext { } @Override - public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { + public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel consistencyLevel) { // There are no executeOnCompletion calls while using the DummyOperationContext // However we keep the code here for correctness runnable.done(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java index af61e206cc..d0635adb3d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -75,13 +76,9 @@ public class OperationContextImpl implements OperationContext { } LinkedList tasks; + LinkedList ignoreReplicationTasks; LinkedList storeOnlyTasks; - long minimalStore = Long.MAX_VALUE; - long minimalReplicated = Long.MAX_VALUE; - long minimalPage = Long.MAX_VALUE; - - static final AtomicIntegerFieldUpdater EXECUTORS_PENDING_UPDATER = AtomicIntegerFieldUpdater .newUpdater(OperationContextImpl.class, "executorsPendingField"); @@ -94,6 +91,29 @@ public class OperationContextImpl implements OperationContext { static final AtomicLongFieldUpdater PAGE_LINEUP_UPDATER = AtomicLongFieldUpdater .newUpdater(OperationContextImpl.class, "pageLineUpField"); + public long getReplicationLineUpField() { + return replicationLineUpField; + } + + public long getReplicated() { + return replicated; + } + + public long getStoreLineUpField() { + return storeLineUpField; + } + + public long getStored() { + return stored; + } + + public long getPagedLinedUpField() { + return pageLineUpField; + } + + public long getPaged() { + return paged; + } volatile int executorsPendingField = 0; volatile long storeLineUpField = 0; @@ -165,11 +185,11 @@ public class OperationContextImpl implements OperationContext { @Override public void executeOnCompletion(IOCallback runnable) { - executeOnCompletion(runnable, false); + executeOnCompletion(runnable, OperationConsistencyLevel.FULL); } @Override - public void executeOnCompletion(final IOCallback completion, final boolean storeOnly) { + public void executeOnCompletion(final IOCallback completion, final OperationConsistencyLevel consistencyLevel) { boolean executeNow = false; synchronized (this) { @@ -177,44 +197,64 @@ public class OperationContextImpl implements OperationContext { final long storeLined = STORE_LINEUP_UPDATER.get(this); final long pageLined = PAGE_LINEUP_UPDATER.get(this); final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this); - if (storeOnly) { - if (storeOnlyTasks == null) { - storeOnlyTasks = new LinkedList<>(); - } - } else { - if (tasks == null) { - tasks = new LinkedList<>(); - minimalReplicated = replicationLined; - minimalStore = storeLined; - minimalPage = pageLined; - } - } - // On this case, we can just execute the context directly - - if (replicationLined == replicated && storeLined == stored && pageLined == paged) { - // We want to avoid the executor if everything is complete... - // However, we can't execute the context if there are executions pending - // We need to use the executor on this case - if (EXECUTORS_PENDING_UPDATER.get(this) == 0) { - // No need to use an executor here or a context switch - // there are no actions pending.. hence we can just execute the task directly on the same thread - executeNow = true; - } else { - execute(completion); - } - } else { - if (storeOnly) { - if (storeLined == stored && EXECUTORS_PENDING_UPDATER.get(this) == 0) { - executeNow = true; + switch (consistencyLevel) { + case STORAGE: + if (storeOnlyTasks == null) { + storeOnlyTasks = new LinkedList<>(); + } + if (storeLined == stored) { + if (hasNoPendingExecution()) { + // setting executeNow = true will make the completion to be called within the same thread here + // without using an executor + executeNow = true; + } else { + execute(completion); + } } else { - assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true; storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined)); } - } else { - // ensure total ordering - assert validateTasksAdd(storeLined, replicationLined, pageLined); - tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined)); - } + break; + + case IGNORE_REPLICATION: + if (ignoreReplicationTasks == null) { + ignoreReplicationTasks = new LinkedList<>(); + } + + if (storeLined == stored && pageLined == paged) { + if (hasNoPendingExecution()) { + // setting executeNow = true will make the completion to be called within the same thread here + // without using an executor + executeNow = true; + } else { + execute(completion); + } + } else { + ignoreReplicationTasks.add(new IgnoreReplicationTaskHolder(completion, storeLined, pageLined)); + } + + break; + + case FULL: + if (tasks == null) { + tasks = new LinkedList<>(); + } + + if (replicationLined == replicated && storeLined == stored && pageLined == paged) { + // We want to avoid the executor if everything is complete... + // However, we can't execute the context if there are executions pending + // We need to use the executor on this case + if (hasNoPendingExecution()) { + // setting executeNow = true will make the completion to be called within the same thread here + // without using an executor + executeNow = true; + } else { + execute(completion); + } + } else { + tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined)); + } + + break; } } } @@ -228,17 +268,8 @@ public class OperationContextImpl implements OperationContext { } - private boolean validateTasksAdd(long storeLined, long replicationLined, long pageLined) { - if (tasks.isEmpty()) { - return true; - } - final TaskHolder holder = tasks.peekLast(); - if (holder.storeLined > storeLined || - holder.replicationLined > replicationLined || - holder.pageLined > pageLined) { - return false; - } - return true; + private boolean hasNoPendingExecution() { + return EXECUTORS_PENDING_UPDATER.get(this) == 0; } @Override @@ -273,14 +304,13 @@ public class OperationContextImpl implements OperationContext { } } - private void checkCompleteContext() { + private void checkRegularCompletion() { final LinkedList tasks = this.tasks; assert tasks != null; final int size = this.tasks.size(); if (size == 0) { return; } - assert size >= 1; // no need to use an iterator here, we can save that cost for (int i = 0; i < size; i++) { final TaskHolder holder = tasks.peek(); @@ -294,14 +324,37 @@ public class OperationContextImpl implements OperationContext { } } + private void checkIgnoreReplicationCompletion() { + final LinkedList tasks = this.ignoreReplicationTasks; + assert tasks != null; + final int size = tasks.size(); + if (size == 0) { + return; + } + for (int i = 0; i < size; i++) { + final IgnoreReplicationTaskHolder holder = tasks.peek(); + if (stored < holder.storeLined || paged < holder.pageLined) { + // End of list here. No other task will be completed after this + return; + } + execute(holder.task); + final IgnoreReplicationTaskHolder removed = tasks.poll(); + assert removed == holder; + } + } + private void checkTasks() { - if (storeOnlyTasks != null) { + if (storeOnlyTasks != null && !storeOnlyTasks.isEmpty()) { checkStoreTasks(); } - if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage) { - checkCompleteContext(); + if (tasks != null && !tasks.isEmpty()) { + checkRegularCompletion(); + } + + if (ignoreReplicationTasks != null && !ignoreReplicationTasks.isEmpty()) { + checkIgnoreReplicationCompletion(); } } @@ -377,6 +430,30 @@ public class OperationContextImpl implements OperationContext { } } + + static final class IgnoreReplicationTaskHolder { + @Override + public String toString() { + return "IgnoreReplicationTaskHolder [storeLined=" + storeLined + + ", pageLined=" + + pageLined + + ", task=" + + task + + "]"; + } + + long storeLined; + long pageLined; + + final IOCallback task; + + IgnoreReplicationTaskHolder(final IOCallback task, long storeLined, long pageLined) { + this.storeLined = storeLined; + this.pageLined = pageLined; + this.task = task; + } + } + /** * This class has been created to both better capture the intention that the {@link IOCallback} is related to a * store-only operation and to reduce the memory footprint for store-only cases, given that many fields of @@ -419,21 +496,17 @@ public class OperationContextImpl implements OperationContext { @Override public String toString() { - return "OperationContextImpl [" + hashCode() + "] [minimalStore=" + minimalStore + + return "OperationContextImpl@" + Integer.toHexString(System.identityHashCode(this)) + "[" + ", storeLineUp=" + storeLineUpField + ", stored=" + stored + - ", minimalReplicated=" + - minimalReplicated + ", replicationLineUp=" + replicationLineUpField + ", replicated=" + replicated + ", paged=" + paged + - ", minimalPage=" + - minimalPage + ", pageLineUp=" + pageLineUpField + ", errorCode=" + @@ -449,11 +522,9 @@ public class OperationContextImpl implements OperationContext { public synchronized void reset() { stored = 0; storeLineUpField = 0; - minimalReplicated = 0; replicated = 0; replicationLineUpField = 0; paged = 0; - minimalPage = 0; pageLineUpField = 0; errorCode = -1; errorMessage = null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 32559ecf47..2697ef53da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; @@ -156,7 +157,7 @@ public class NullStorageManager implements StorageManager { } @Override - public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { + public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel consistencyLevel) { runnable.done(); } @@ -425,6 +426,11 @@ public class NullStorageManager implements StorageManager { run.done(); } + @Override + public void afterCompleteOperations(IOCallback run, OperationConsistencyLevel consistencyLevel) { + run.done(); + } + @Override public void afterStoreOperations(IOCallback run) { run.done(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index 7cf8fd4800..aaf1b2dbfb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQTransactionTimeoutException; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; @@ -257,6 +258,10 @@ public class TransactionImpl implements Transaction { } } + protected OperationConsistencyLevel getRequiredConsistency() { + return OperationConsistencyLevel.FULL; + } + @Override public void commit(final boolean onePhase) throws Exception { logger.trace("TransactionImpl::commit::{}", this); @@ -313,7 +318,7 @@ public class TransactionImpl implements Transaction { public void done() { afterCommit(operationsToComplete); } - }); + }, getRequiredConsistency()); final List storeOperationsToComplete = this.storeOperations; this.storeOperations = null; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java index b9471e2713..37cc18b4c3 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -31,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.tests.util.ServerTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.Wait; @@ -119,7 +121,7 @@ public class OperationContextUnitTest extends ServerTestBase { public void done() { latch1.countDown(); } - }, true); + }, OperationConsistencyLevel.STORAGE); impl.storeLineUp(); @@ -133,7 +135,7 @@ public class OperationContextUnitTest extends ServerTestBase { public void done() { latch3.countDown(); } - }, true); + }, OperationConsistencyLevel.STORAGE); impl.done(); @@ -158,7 +160,7 @@ public class OperationContextUnitTest extends ServerTestBase { public void done() { latch2.countDown(); } - }, true); + }, OperationConsistencyLevel.STORAGE); assertFalse(latch2.await(1, TimeUnit.MILLISECONDS)); @@ -179,22 +181,22 @@ public class OperationContextUnitTest extends ServerTestBase { @Test public void testCompletionLateStoreOnly() throws Exception { - testCompletionLate(true); + testCompletionLate(OperationConsistencyLevel.STORAGE); } @Test public void testCompletionLate() throws Exception { - testCompletionLate(false); + testCompletionLate(OperationConsistencyLevel.FULL); } - private void testCompletionLate(boolean storeOnly) throws Exception { + private void testCompletionLate(OperationConsistencyLevel consistencyLevel) throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())); try { OperationContextImpl impl = new OperationContextImpl(executor); final CountDownLatch latch1 = new CountDownLatch(1); final CountDownLatch latch2 = new CountDownLatch(1); - if (storeOnly) { + if (consistencyLevel == OperationConsistencyLevel.STORAGE) { // if storeOnly, then the pageSyncLinup and replication lineup should not bother the results impl.pageSyncLineUp(); impl.replicationLineUp(); @@ -211,7 +213,7 @@ public class OperationContextUnitTest extends ServerTestBase { public void done() { latch1.countDown(); } - }, storeOnly); + }, consistencyLevel); impl.storeLineUpField = 350000; impl.stored = impl.storeLineUpField - 1; @@ -234,7 +236,7 @@ public class OperationContextUnitTest extends ServerTestBase { public void done() { latch2.countDown(); } - }, storeOnly); + }, consistencyLevel); impl.done(); @@ -292,6 +294,112 @@ public class OperationContextUnitTest extends ServerTestBase { } } + @Test + public void testIgnoreReplication() throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())); + runAfter(executor::shutdownNow); + ConcurrentLinkedQueue ignoreReplicationCompletions = new ConcurrentLinkedQueue(); + ConcurrentLinkedQueue regularCompletion = new ConcurrentLinkedQueue(); + final int N = 500; + final OperationContextImpl impl = new OperationContextImpl(new OrderedExecutor(executor)); + + // pending work to queue completions till done + impl.storeLineUp(); + impl.replicationLineUp(); + + for (long l = 0; l < N; l++) { + long finalL = l; + impl.executeOnCompletion(new IOCallback() { + @Override + public void onError(int errorCode, String errorMessage) { + } + + @Override + public void done() { + ignoreReplicationCompletions.add(finalL); + } + }, OperationConsistencyLevel.IGNORE_REPLICATION); + impl.executeOnCompletion(new IOCallback() { + @Override + public void onError(int errorCode, String errorMessage) { + } + + @Override + public void done() { + regularCompletion.add(finalL); + } + }, OperationConsistencyLevel.FULL); + } + + flushExecutor(executor); + assertEquals(0, ignoreReplicationCompletions.size()); + assertEquals(0, regularCompletion.size()); + impl.done(); + + flushExecutor(executor); + assertEquals(N, ignoreReplicationCompletions.size()); + assertEquals(0, regularCompletion.size()); + + impl.replicationDone(); + flushExecutor(executor); + + assertEquals(N, regularCompletion.size()); + + for (long i = 0; i < N; i++) { + assertEquals(i, (long) ignoreReplicationCompletions.poll(), "ordered"); + assertEquals(i, (long) regularCompletion.poll(), "ordered"); + } + } + + private void flushExecutor(Executor executor) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + executor.execute(latch::countDown); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } + + @Test + public void testWaitOnReplication() throws Exception { + ExecutorService executor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())); + runAfter(executor::shutdownNow); + + ConcurrentLinkedQueue completions = new ConcurrentLinkedQueue(); + + final int N = 500; + final OperationContextImpl impl = new OperationContextImpl(new OrderedExecutor(executor)); + + // pending work to queue completions till done + impl.storeLineUp(); + impl.replicationLineUp(); + + for (long l = 0; l < N; l++) { + long finalL = l; + impl.executeOnCompletion(new IOCallback() { + @Override + public void onError(int errorCode, String errorMessage) { + } + + @Override + public void done() { + completions.add(finalL); + } + }); + } + + impl.done(); + + flushExecutor(executor); + assertEquals(0, completions.size()); + + impl.replicationDone(); + flushExecutor(executor); + + Wait.assertEquals(N, ()-> completions.size(), 5000, 100); + + for (long i = 0; i < N; i++) { + assertEquals(i, (long) completions.poll(), "ordered"); + } + + } @Test public void testErrorNotLostOnPageSyncError() throws Exception { @@ -317,7 +425,7 @@ public class OperationContextUnitTest extends ServerTestBase { } try { - final int numJobs = 10000; + final int numJobs = 1000; final CountDownLatch errorsOnLateRegister = new CountDownLatch(numJobs); for (int i = 0; i < numJobs; i++) { @@ -342,14 +450,7 @@ public class OperationContextUnitTest extends ServerTestBase { done.await(); } - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisfied() throws Exception { - return errorsOnLateRegister.await(1, TimeUnit.SECONDS); - } - })); - + assertTrue(errorsOnLateRegister.await(10, TimeUnit.SECONDS)); } finally { executor.shutdown(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index 64e407d342..83ad4d8db7 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; @@ -431,6 +432,11 @@ public class TransactionImplTest extends ServerTestBase { run.done(); } + @Override + public void afterCompleteOperations(IOCallback run, OperationConsistencyLevel consistencyLevel) { + run.done(); + } + @Override public void afterStoreOperations(IOCallback run) { run.done(); diff --git a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java index eb32a4448e..46d823c3f2 100644 --- a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java +++ b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java @@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; @@ -6636,7 +6637,7 @@ public class PagingTest extends ParameterDBTestBase { } @Override - public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { + public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel consistencyLevel) { runnable.done(); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index dfb2ef016a..04f5038203 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -46,6 +46,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; @@ -389,6 +390,11 @@ public class SendAckFailTest extends SpawnedTestBase { manager.afterCompleteOperations(run); } + @Override + public void afterCompleteOperations(IOCallback run, OperationConsistencyLevel level) { + manager.afterCompleteOperations(run); + } + @Override public void afterStoreOperations(IOCallback run) { manager.afterStoreOperations(run); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java index b00178290a..0be8cee090 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches; @@ -95,7 +96,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase { public void onError(int errorCode, String errorMessage) { } - }, true); + }, OperationConsistencyLevel.STORAGE); assertTrue(latch.await(1, TimeUnit.MINUTES)); diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java new file mode 100644 index 0000000000..6025fc679a --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.brokerConnection; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.File; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.management.SimpleManagement; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.util.ServerUtil; +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** This test will keep a consumer active on both nodes. Up to the point an ack would give up. + * It has configured a massive number of retries, so even after keeping the consumer on for some time should not make the ack retry to go away. + * as soon as the consumer gives up the message the retry should succeed. */ +public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase { + + private static final String QUEUE_NAME = "MirrorInfiniteRetryReplicaTestQueue"; + + public static final String DC1_NODE = "MirrorInfiniteRetryReplicaTest/DC1"; + public static final String DC2_NODE = "MirrorInfiniteRetryReplicaTest/DC2"; + public static final String DC2_REPLICA_NODE = "MirrorInfiniteRetryReplicaTest/DC2_REPLICA"; + public static final String DC1_REPLICA_NODE = "MirrorInfiniteRetryReplicaTest/DC1_REPLICA"; + + volatile Process processDC1; + volatile Process processDC2; + volatile Process processDC1_REPLICA; + volatile Process processDC2_REPLICA; + + + // change this to true to have the server producing more detailed logs + private static final boolean TRACE_LOGS = false; + + @AfterEach + public void destroyServers() throws Exception { + if (processDC2_REPLICA != null) { + processDC2_REPLICA.destroyForcibly(); + processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES); + processDC2_REPLICA = null; + } + if (processDC1_REPLICA != null) { + processDC1_REPLICA.destroyForcibly(); + processDC1_REPLICA.waitFor(1, TimeUnit.MINUTES); + processDC1_REPLICA = null; + } + if (processDC1 != null) { + processDC1.destroyForcibly(); + processDC1.waitFor(1, TimeUnit.MINUTES); + processDC1 = null; + } + if (processDC2 != null) { + processDC2.destroyForcibly(); + processDC2.waitFor(1, TimeUnit.MINUTES); + processDC2 = null; + } + } + + private static final String DC1_IP = "localhost:61616"; + private static final String DC1_BACKUP_IP = "localhost:61617"; + private static final String DC2_IP = "localhost:61618"; + private static final String DC2_BACKUP_IP = "localhost:61619"; + + private static String uri(String ip) { + return "tcp://" + ip; + } + + private static String uriWithAlternate(String ip, String alternate) { + return "tcp://" + ip + "#tcp://" + alternate; + } + + private static void createMirroredServer(String serverName, + String connectionName, + String mirrorURI, + int portOffset, + boolean replicated, + String clusterStatic) throws Exception { + File serverLocation = getFileServerLocation(serverName); + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = new HelperCreate(); + cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation); + cliCreateServer.setNoWeb(true); + cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE); + cliCreateServer.addArgs("--queues", QUEUE_NAME); + cliCreateServer.setPortOffset(portOffset); + if (replicated) { + cliCreateServer.setReplicated(true); + cliCreateServer.setStaticCluster(clusterStatic); + cliCreateServer.setClustered(true); + } else { + cliCreateServer.setClustered(false); + } + + cliCreateServer.createServer(); + + Properties brokerProperties = new Properties(); + brokerProperties.put("messageExpiryScanPeriod", "1000"); + brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI); + brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000"); + brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString()); + brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false"); + brokerProperties.put("largeMessageSync", "false"); + + brokerProperties.put("addressSettings.#.maxSizeMessages", "50000"); + brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000"); + brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1"); + brokerProperties.put("addressSettings.#.prefetchPageMessages", "500"); + // if we don't use pageTransactions we may eventually get a few duplicates + brokerProperties.put("mirrorPageTransaction", "true"); + brokerProperties.put("mirrorAckManagerQueueAttempts", "1000000"); // massive amount of retries, it should keep retrying even if there is a consumer holding a message + + File brokerPropertiesFile = new File(serverLocation, "broker.properties"); + saveProperties(brokerProperties, brokerPropertiesFile); + + if (TRACE_LOGS) { + replaceLogs(serverLocation); + } + + } + + private static void replaceLogs(File serverLocation) throws Exception { + File log4j = new File(serverLocation, "/etc/log4j2.properties"); + assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", + "logger.artemis_utils.level=INFO\n" + "\n" + + "logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n" + + "logger.endpoint.level=DEBUG\n" + + "logger.ackmanager.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n" + + "logger.ackmanager.level=INFO\n" + + + "logger.mirrorTarget.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget\n" + + "logger.mirrorTarget.level=INFO\n" + + + "appender.console.filter.threshold.type = ThresholdFilter\n" + + "appender.console.filter.threshold.level = trace")); + } + + private static void createMirroredBackupServer(String serverName, + int portOffset, + String clusterStatic, + String mirrorURI) throws Exception { + File serverLocation = getFileServerLocation(serverName); + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = new HelperCreate(); + cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation); + cliCreateServer.setMessageLoadBalancing("ON_DEMAND"); + cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE); + cliCreateServer.setPortOffset(portOffset); + cliCreateServer.setClustered(true); + cliCreateServer.setReplicated(true); + cliCreateServer.setBackup(true); + cliCreateServer.setStaticCluster(clusterStatic); + cliCreateServer.createServer(); + + Properties brokerProperties = new Properties(); + brokerProperties.put("messageExpiryScanPeriod", "1000"); + brokerProperties.put("AMQPConnections.mirror.uri", mirrorURI); + brokerProperties.put("AMQPConnections.mirror.retryInterval", "1000"); + brokerProperties.put("AMQPConnections.mirror.type", AMQPBrokerConnectionAddressType.MIRROR.toString()); + brokerProperties.put("AMQPConnections.mirror.connectionElements.mirror.sync", "false"); + brokerProperties.put("largeMessageSync", "false"); + + brokerProperties.put("addressSettings.#.maxSizeMessages", "1"); + brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000"); + brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1"); + brokerProperties.put("addressSettings.#.prefetchPageMessages", "500"); + + brokerProperties.put("mirrorAckManagerQueueAttempts", "1000000"); // massive amount of retries, it should keep retrying even if there is a consumer holding a message + + // if we don't use pageTransactions we may eventually get a few duplicates + brokerProperties.put("mirrorPageTransaction", "true"); + File brokerPropertiesFile = new File(serverLocation, "broker.properties"); + saveProperties(brokerProperties, brokerPropertiesFile); + + File brokerXml = new File(serverLocation, "/etc/broker.xml"); + assertTrue(brokerXml.exists()); + // Adding redistribution delay to broker configuration + assertTrue(FileUtil.findReplace(brokerXml, "", "\n\n" + " 0 \n")); + assertTrue(FileUtil.findReplace(brokerXml, "10M", "100K")); + + if (TRACE_LOGS) { + replaceLogs(serverLocation); + } + } + + public static void createRealServers() throws Exception { + createMirroredServer(DC1_NODE, "mirror", uriWithAlternate(DC2_IP, DC2_BACKUP_IP), 0, true, uri(DC1_BACKUP_IP)); + createMirroredBackupServer(DC1_REPLICA_NODE, 1, uri(DC1_IP), uriWithAlternate(DC2_IP, DC2_BACKUP_IP)); + createMirroredServer(DC2_NODE, "mirror", uriWithAlternate(DC1_IP, DC1_BACKUP_IP), 2, true, uri(DC2_BACKUP_IP)); + createMirroredBackupServer(DC2_REPLICA_NODE, 3, uri(DC2_IP), uriWithAlternate(DC1_IP, DC1_BACKUP_IP)); + } + + @Test + public void testConsumersAttached() throws Exception { + createRealServers(); + + SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null, null); + SimpleManagement managementDC2 = new SimpleManagement(uri(DC2_IP), null, null); + + processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties")); + processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new File(getServerLocation(DC2_REPLICA_NODE), "broker.properties")); + + processDC1 = startServer(DC1_NODE, -1, -1, new File(getServerLocation(DC1_NODE), "broker.properties")); + processDC1_REPLICA = startServer(DC1_REPLICA_NODE, -1, -1, new File(getServerLocation(DC1_REPLICA_NODE), "broker.properties")); + + ServerUtil.waitForServerToStart(2, 10_000); + Wait.assertTrue(managementDC2::isReplicaSync); + + ServerUtil.waitForServerToStart(0, 10_000); + Wait.assertTrue(managementDC1::isReplicaSync); + + runAfter(() -> managementDC1.close()); + runAfter(() -> managementDC2.close()); + + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", uri(DC1_IP)); + try (Connection connection = connectionFactoryDC1A.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + TextMessage message = session.createTextMessage("Simple message"); + message.setIntProperty("i", 1); + message.setBooleanProperty("large", false); + producer.send(message); + session.commit(); + } + + ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", uri(DC2_IP)); + try (Connection connectionDC2 = connectionFactoryDC2A.createConnection(); Connection connectionDC1 = connectionFactoryDC1A.createConnection()) { + connectionDC2.start(); + connectionDC1.start(); + + // we will receive the message and hold it... + Session sessionDC2 = connectionDC2.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = sessionDC2.createQueue(QUEUE_NAME); + MessageConsumer consumerDC2 = sessionDC2.createConsumer(queue); + assertNotNull(consumerDC2.receive(5000)); + + Session sessionDC1 = connectionDC1.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumerDC1 = sessionDC1.createConsumer(queue); + assertNotNull(consumerDC1.receive(5000)); + sessionDC1.commit(); + + assertEquals(1, managementDC2.getMessageCountOnQueue(QUEUE_NAME)); + + // we roll it back and close the consumer, the message should now be back to be retried correctly + sessionDC2.rollback(); + consumerDC2.close(); + Wait.assertEquals(0, () -> managementDC2.getDeliveringCountOnQueue(QUEUE_NAME)); + Wait.assertEquals(0, () -> managementDC2.getMessageCountOnQueue(QUEUE_NAME)); + } + } + +} \ No newline at end of file