diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index b8500f153e..c3ab68eacb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -710,13 +710,15 @@ public final class ActiveMQDefaultConfiguration { // These properties used to defined with this prefix. // I'm keeping the older property name in an attempt to guarantee compatibility private static final String FORMER_ACK_RETRY_CLASS_NAME = "org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckRetry"; - private static final int DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MIN_QUEUE_ATTEMPTS", "5"));; - private static final int DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MAX_PAGE_ATTEMPT", "2"));; + private static final int DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MIN_QUEUE_ATTEMPTS", "5"));; + private static final int DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MAX_PAGE_ATTEMPT", "2"));; private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".RETRY_DELAY", "100"));; private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false; + private static final boolean DEFAULT_MIRROR_REPLICA_SYNC = true; + /** * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers. */ @@ -1953,19 +1955,23 @@ public final class ActiveMQDefaultConfiguration { /** This configures the Mirror Ack Manager number of attempts on queues before trying page acks. * It is not intended to be configured through the XML. * The default value here is 5. */ - public static int getMirrorAckManagerMinQueueAttempts() { - return DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS; + public static int getMirrorAckManagerQueueAttempts() { + return DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS; } - public static int getMirrorAckManagerMaxPageAttempts() { - return DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS; + public static int getMirrorAckManagerPageAttempts() { + return DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS; } public static int getMirrorAckManagerRetryDelay() { return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY; } - public static boolean getDefaultMirrorPageTransaction() { + public static boolean getMirrorReplicaSync() { + return DEFAULT_MIRROR_REPLICA_SYNC; + } + + public static boolean getMirrorPageTransaction() { return DEFAULT_MIRROR_PAGE_TRANSACTION; } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java index f0d70340dd..40f49e16da 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java @@ -233,6 +233,9 @@ public class JournalHashMap implements Map { // callers must be synchronized private void removed(MapRecord record) { + if (logger.isTraceEnabled()) { + logger.info("Removing record {}", record, new Exception("trace")); + } try { journal.appendDeleteRecord(record.id, false); } catch (Exception e) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index cf1c3ac641..b6d59ec4ce 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -702,8 +702,9 @@ public class AMQPSessionCallback implements SessionCallback { storageManager.setContext(oldContext); } + /** Set the proper operation context in the Thread Local. + * Return the old context*/ public OperationContext recoverContext() { - OperationContext oldContext = storageManager.getContext(); manager.getServer().getStorageManager().setContext(serverSession.getSessionContext()); return oldContext; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index a5c549483f..1a854dc246 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -46,6 +46,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader; @@ -63,6 +64,8 @@ import org.apache.qpid.proton.engine.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS; @@ -95,6 +98,22 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement return CONTROLLER_THREAD_LOCAL.get(); } + /** The rate in milliseconds that we will print OperationContext debug information on the mirror target */ + private static final int DEBUG_CONTEXT_PERIOD; + private ScheduledFuture scheduledRateDebugFuture = null; + + static { + int period; + try { + period = Integer.parseInt(System.getProperty(AMQPMirrorControllerTarget.class.getName() + ".DEBUG_CONTEXT_PERIOD", "5000")); + } catch (Throwable e) { + logger.debug(e.getMessage(), e); + period = 0; + } + + DEBUG_CONTEXT_PERIOD = period; + } + /** * Objects of this class can be used by either transaction or by OperationContext. * It is important that when you're using the transactions you clear any references to @@ -201,12 +220,23 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement creditRunnable.run(); } + + @Override + protected OperationContext recoverContext() { + OperationContext oldContext = super.recoverContext(); + OperationContextImpl.getContext().setSyncReplication(configuration.isMirrorReplicaSync()); + return oldContext; + } + @Override protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) { - recoverContext(); + OperationContext oldContext = recoverContext(); + + scheduleRateDebug(); + incrementSettle(); - logger.trace("{}::actualdelivery call for {}", server, message); + logger.trace("{}::actualDelivery call for {}", server, message); setControllerInUse(this); delivery.setContext(message); @@ -281,6 +311,21 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement if (messageAckOperation != null) { server.getStorageManager().afterCompleteOperations(messageAckOperation); } + + OperationContextImpl.setContext(oldContext); + } + } + + private void scheduleRateDebug() { + if (logger.isDebugEnabled()) { // no need to schedule rate debug if no debug allowed + if (DEBUG_CONTEXT_PERIOD > 0 && scheduledRateDebugFuture == null) { + OperationContextImpl context = (OperationContextImpl) OperationContextImpl.getContext(); + scheduledRateDebugFuture = server.getScheduledPool().scheduleAtFixedRate(() -> { + logger.debug(">>> OperationContext rate information: synReplica={}, replicationLineup = {}. replicationDone = {}, pending replica (back pressure) = {}, storeLineUp = {}, storeDone = {}, pageLineUp = {}, paged = {}", configuration.isMirrorReplicaSync(), context.getReplicationLineUpField(), context.getReplicated(), (context.getReplicationLineUpField() - context.getReplicated()), context.getStoreLineUpField(), context.getStored(), context.getPagedLinedUpField(), context.getPaged()); + }, DEBUG_CONTEXT_PERIOD, DEBUG_CONTEXT_PERIOD, TimeUnit.MILLISECONDS); + } + } else { + cancelRateDebug(); } } @@ -497,6 +542,23 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement return true; } + @Override + public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { + super.close(remoteLinkClose); + cancelRateDebug(); + } + + private void cancelRateDebug() { + if (scheduledRateDebugFuture != null) { + try { + scheduledRateDebugFuture.cancel(true); + } catch (Throwable e) { + logger.debug("error on cancelRateDebug", e); + } + scheduledRateDebugFuture = null; + } + } + /** When the source mirror receives messages from a cluster member of his own, it should then fill targetQueues so we could play the same semantic the source applied on its routing */ private void targetQueuesRouting(final Message message, final RoutingContext context, @@ -524,4 +586,4 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement // Do nothing } -} +} \ No newline at end of file diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 7bae95cad2..0414fae283 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; @@ -71,7 +72,7 @@ public class AckManager implements ActiveMQComponent { final ReferenceIDSupplier referenceIDSupplier; final IOCriticalErrorListener ioCriticalErrorListener; volatile MultiStepProgress progress; - ActiveMQScheduledComponent scheduledComponent; + volatile ActiveMQScheduledComponent scheduledComponent; public AckManager(ActiveMQServer server) { this.server = server; @@ -137,7 +138,14 @@ public class AckManager implements ActiveMQComponent { // schedule a retry if (!sortRetries().isEmpty()) { - scheduledComponent.delay(); + ActiveMQScheduledComponent scheduleComponentReference = scheduledComponent; + if (scheduleComponentReference != null) { + try { + scheduleComponentReference.delay(); + } catch (RejectedExecutionException thatsOK) { + logger.debug(thatsOK.getMessage(), thatsOK); + } + } } } @@ -260,12 +268,12 @@ public class AckManager implements ActiveMQComponent { if (retry.getQueueAttempts() >= configuration.getMirrorAckManagerQueueAttempts()) { if (retry.attemptedPage() >= configuration.getMirrorAckManagerPageAttempts()) { if (logger.isDebugEnabled()) { - logger.debug("Retried {} {} times, giving up on the entry now", retry, retry.getPageAttempts()); + logger.debug("Retried {} {} times, giving up on the entry now. Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); } retries.remove(retry); } else { if (logger.isDebugEnabled()) { - logger.trace("Retry {} attempted {} times on paging", retry, retry.getPageAttempts()); + logger.trace("Retry {} attempted {} times on paging, Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java index 9dbfe5406d..323254c8d7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import java.lang.invoke.MethodHandles; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; @@ -91,8 +92,8 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme return protonSession; } - protected void recoverContext() { - sessionSPI.recoverContext(); + protected OperationContext recoverContext() { + return sessionSPI.recoverContext(); } protected void closeCurrentReader() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 1df52bd0bc..371dd27d6f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -1535,4 +1535,10 @@ public interface Configuration { boolean isMirrorPageTransaction(); Configuration setMirrorPageTransaction(boolean ignorePageTransactions); + + /** It is possible to relax data synchronization requirements on a target mirror configured to use journal replication. + * If this is set to false Mirror Operations will not wait a response from replication before completing any operations */ + boolean isMirrorReplicaSync(); + + Configuration setMirrorReplicaSync(boolean replicaSync); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 2a532d16bb..07931c1963 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -440,13 +440,15 @@ public class ConfigurationImpl implements Configuration, Serializable { private boolean managementMessagesRbac = ActiveMQDefaultConfiguration.getManagementMessagesRbac(); - private int mirrorAckManagerMinQueueAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMinQueueAttempts(); + private int mirrorAckManagerQueueAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerQueueAttempts(); - private int mirrorAckManagerMaxPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMaxPageAttempts(); + private int mirrorAckManagerPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerPageAttempts(); private int mirrorAckManagerRetryDelay = ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay(); - private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getDefaultMirrorPageTransaction(); + private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getMirrorPageTransaction(); + + private boolean mirrorReplicaSync = ActiveMQDefaultConfiguration.getMirrorReplicaSync(); /** @@ -941,6 +943,9 @@ public class ConfigurationImpl implements Configuration, Serializable { // Identify the property name and value(s) to be assigned final String name = entry.getKey(); try { + if (logger.isDebugEnabled()) { + logger.debug("set property target={}, name = {}, value = {}", target.getClass(), name, entry.getValue()); + } // Perform the assignment for this property beanUtils.setProperty(target, name, entry.getValue()); } catch (InvocationTargetException invocationTargetException) { @@ -3379,25 +3384,25 @@ public class ConfigurationImpl implements Configuration, Serializable { @Override public int getMirrorAckManagerQueueAttempts() { - return mirrorAckManagerMinQueueAttempts; + return mirrorAckManagerQueueAttempts; } @Override public ConfigurationImpl setMirrorAckManagerQueueAttempts(int minQueueAttempts) { logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}", minQueueAttempts); - this.mirrorAckManagerMinQueueAttempts = minQueueAttempts; + this.mirrorAckManagerQueueAttempts = minQueueAttempts; return this; } @Override public int getMirrorAckManagerPageAttempts() { - return this.mirrorAckManagerMaxPageAttempts; + return this.mirrorAckManagerPageAttempts; } @Override public ConfigurationImpl setMirrorAckManagerPageAttempts(int maxPageAttempts) { logger.debug("Setting mirrorAckManagerMaxPageAttempts = {}", maxPageAttempts); - this.mirrorAckManagerMaxPageAttempts = maxPageAttempts; + this.mirrorAckManagerPageAttempts = maxPageAttempts; return this; } @@ -3413,6 +3418,18 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + @Override + public boolean isMirrorReplicaSync() { + return mirrorReplicaSync; + } + + @Override + public ConfigurationImpl setMirrorReplicaSync(boolean replicaSync) { + logger.debug("setMirrorReplicaSync {}", replicaSync); + this.mirrorReplicaSync = replicaSync; + return this; + } + @Override public boolean isMirrorPageTransaction() { return mirrorPageTransaction; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 0d8d2080e3..487e8ac12e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -389,6 +389,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String MIRROR_PAGE_TRANSACTION = "mirror-page-transaction"; + private static final String MIRROR_REPLICA_SYNC = "mirror-replica-sync"; + private static final String INITIAL_QUEUE_BUFFER_SIZE = "initial-queue-buffer-size"; private boolean validateAIO = false; @@ -862,6 +864,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setMirrorPageTransaction(getBoolean(e, MIRROR_PAGE_TRANSACTION, config.isMirrorPageTransaction())); + config.setMirrorReplicaSync(getBoolean(e, MIRROR_REPLICA_SYNC, config.isMirrorReplicaSync())); + config.setMirrorAckManagerPageAttempts(getInteger(e, MIRROR_ACK_MANAGER_PAGE_ATTEMPTS, config.getMirrorAckManagerPageAttempts(), GT_ZERO)); config.setMirrorAckManagerQueueAttempts(getInteger(e, MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS, config.getMirrorAckManagerQueueAttempts(), GT_ZERO)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java index dd7276fdfc..a4d76c1e8a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java @@ -37,6 +37,13 @@ public interface OperationContext extends IOCompletion { */ void executeOnCompletion(IOCallback runnable, boolean storeOnly); + default void setSyncReplication(boolean syncReplication) { + } + + default boolean isSyncReplication() { + return true; + } + /** * Execute the task when all IO operations are complete, * Or execute it immediately if nothing is pending. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java index af61e206cc..787a3566dc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java @@ -47,6 +47,19 @@ import org.apache.commons.collections.buffer.CircularFifoBuffer; */ public class OperationContextImpl implements OperationContext { + + private boolean syncReplication = true; + + @Override + public void setSyncReplication(boolean syncReplication) { + this.syncReplication = syncReplication; + } + + @Override + public boolean isSyncReplication() { + return syncReplication; + } + private static final ThreadLocal threadLocalContext = new ThreadLocal<>(); public static void clearContext() { @@ -94,6 +107,29 @@ public class OperationContextImpl implements OperationContext { static final AtomicLongFieldUpdater PAGE_LINEUP_UPDATER = AtomicLongFieldUpdater .newUpdater(OperationContextImpl.class, "pageLineUpField"); + public long getReplicationLineUpField() { + return replicationLineUpField; + } + + public long getReplicated() { + return replicated; + } + + public long getStoreLineUpField() { + return storeLineUpField; + } + + public long getStored() { + return stored; + } + + public long getPagedLinedUpField() { + return pageLineUpField; + } + + public long getPaged() { + return paged; + } volatile int executorsPendingField = 0; volatile long storeLineUpField = 0; @@ -284,7 +320,7 @@ public class OperationContextImpl implements OperationContext { // no need to use an iterator here, we can save that cost for (int i = 0; i < size; i++) { final TaskHolder holder = tasks.peek(); - if (stored < holder.storeLined || replicated < holder.replicationLined || paged < holder.pageLined) { + if (stored < holder.storeLined || syncReplication && replicated < holder.replicationLined || paged < holder.pageLined) { // End of list here. No other task will be completed after this return; } @@ -300,7 +336,7 @@ public class OperationContextImpl implements OperationContext { checkStoreTasks(); } - if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage) { + if (stored >= minimalStore && (!syncReplication || replicated >= minimalReplicated) && paged >= minimalPage) { checkCompleteContext(); } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 2eeb1e0d61..649514ed5b 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -957,6 +957,16 @@ + + + + If journal replication is used on a target mirror, it is possible to ignore replica waits for any mirror operation. + + This is exposed as mirrorReplicaSync on broker properties. + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java index 05b65f70b6..0f9b54f0a3 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java @@ -161,5 +161,7 @@ public class DefaultsFileConfigurationTest extends AbstractConfigurationTestBase assertEquals(ActiveMQDefaultConfiguration.getDefaultLoggingMetrics(), conf.getMetricsConfiguration().isLogging()); assertEquals(ActiveMQDefaultConfiguration.getDefaultSecurityCacheMetrics(), conf.getMetricsConfiguration().isSecurityCaches()); + + assertEquals(ActiveMQDefaultConfiguration.getMirrorReplicaSync(), conf.isMirrorReplicaSync()); } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 331b0fa187..b657d70c8e 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -582,6 +582,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase { assertEquals(222, conf.getMirrorAckManagerPageAttempts()); assertEquals(333, conf.getMirrorAckManagerRetryDelay()); assertTrue(conf.isMirrorPageTransaction()); + assertFalse(conf.isMirrorReplicaSync()); assertTrue(conf.getResourceLimitSettings().containsKey("myUser")); assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java index b9471e2713..11f49d322a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -292,6 +293,97 @@ public class OperationContextUnitTest extends ServerTestBase { } } + @Test + public void testIgnoreReplication() throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())); + runAfter(executor::shutdownNow); + ConcurrentLinkedQueue completions = new ConcurrentLinkedQueue(); + final int N = 500; + final OperationContextImpl impl = new OperationContextImpl(new OrderedExecutor(executor)); + + // pending work to queue completions till done + impl.storeLineUp(); + impl.setSyncReplication(false); + impl.replicationLineUp(); + + for (long l = 0; l < N; l++) { + long finalL = l; + impl.executeOnCompletion(new IOCallback() { + @Override + public void onError(int errorCode, String errorMessage) { + } + + @Override + public void done() { + completions.add(finalL); + } + }); + } + + flushExecutor(executor); + assertEquals(0, completions.size()); + impl.done(); + + flushExecutor(executor); + assertEquals(N, completions.size()); + + impl.replicationDone(); + flushExecutor(executor); + + for (long i = 0; i < N; i++) { + assertEquals(i, (long) completions.poll(), "ordered"); + } + } + + private void flushExecutor(Executor executor) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + executor.execute(latch::countDown); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } + + @Test + public void testWaitOnReplication() throws Exception { + ExecutorService executor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())); + runAfter(executor::shutdownNow); + + ConcurrentLinkedQueue completions = new ConcurrentLinkedQueue(); + + final int N = 500; + final OperationContextImpl impl = new OperationContextImpl(new OrderedExecutor(executor)); + + // pending work to queue completions till done + impl.storeLineUp(); + impl.replicationLineUp(); + + for (long l = 0; l < N; l++) { + long finalL = l; + impl.executeOnCompletion(new IOCallback() { + @Override + public void onError(int errorCode, String errorMessage) { + } + + @Override + public void done() { + completions.add(finalL); + } + }); + } + + impl.done(); + + flushExecutor(executor); + assertEquals(0, completions.size()); + + impl.replicationDone(); + flushExecutor(executor); + + Wait.assertEquals(N, ()-> completions.size(), 5000, 100); + + for (long i = 0; i < N; i++) { + assertEquals(i, (long) completions.poll(), "ordered"); + } + + } @Test public void testErrorNotLostOnPageSyncError() throws Exception { @@ -317,7 +409,7 @@ public class OperationContextUnitTest extends ServerTestBase { } try { - final int numJobs = 10000; + final int numJobs = 1000; final CountDownLatch errorsOnLateRegister = new CountDownLatch(numJobs); for (int i = 0; i < numJobs; i++) { @@ -342,14 +434,7 @@ public class OperationContextUnitTest extends ServerTestBase { done.await(); } - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisfied() throws Exception { - return errorsOnLateRegister.await(1, TimeUnit.SECONDS); - } - })); - + assertTrue(errorsOnLateRegister.await(10, TimeUnit.SECONDS)); } finally { executor.shutdown(); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 00d56e3292..a7a9ded3d1 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -557,6 +557,7 @@ 222 333 true + false diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index c0678ae9e2..3824a7a744 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -75,6 +75,7 @@ 222 333 true + false org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml index 1c163b4a14..c7d9ec1f8b 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml @@ -75,6 +75,7 @@ 222 333 true + false diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java new file mode 100644 index 0000000000..ddc2256a5a --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirrorInfiniteRetryReplicaTest.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.brokerConnection; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.File; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.management.SimpleManagement; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.util.ServerUtil; +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase { + + private static final String QUEUE_NAME = "MirrorInfiniteRetryReplicaTestQueue"; + + public static final String DC1_NODE = "AckLateRetrySoakTest/DC1"; + public static final String DC2_NODE = "AckLateRetrySoakTest/DC2"; + public static final String DC2_REPLICA_NODE = "AckLateRetrySoakTest/DC2_REPLICA"; + public static final String DC1_REPLICA_NODE = "AckLateRetrySoakTest/DC1_REPLICA"; + + volatile Process processDC1; + volatile Process processDC2; + volatile Process processDC1_REPLICA; + volatile Process processDC2_REPLICA; + + @AfterEach + public void destroyServers() throws Exception { + if (processDC2_REPLICA != null) { + processDC2_REPLICA.destroyForcibly(); + processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES); + processDC2_REPLICA = null; + } + if (processDC1_REPLICA != null) { + processDC1_REPLICA.destroyForcibly(); + processDC1_REPLICA.waitFor(1, TimeUnit.MINUTES); + processDC1_REPLICA = null; + } + if (processDC1 != null) { + processDC1.destroyForcibly(); + processDC1.waitFor(1, TimeUnit.MINUTES); + processDC1 = null; + } + if (processDC2 != null) { + processDC2.destroyForcibly(); + processDC2.waitFor(1, TimeUnit.MINUTES); + processDC2 = null; + } + } + + private static final String DC1_IP = "localhost:61616"; + private static final String DC1_BACKUP_IP = "localhost:61617"; + private static final String DC2_IP = "localhost:61618"; + private static final String DC2_BACKUP_IP = "localhost:61619"; + + private static String uri(String ip) { + return "tcp://" + ip; + } + + private static String uriWithAlternate(String ip, String alternate) { + return "tcp://" + ip + "#tcp://" + alternate; + } + + private static void createMirroredServer(String serverName, + String connectionName, + String mirrorURI, + int porOffset, + boolean replicated, + String clusterStatic) throws Exception { + File serverLocation = getFileServerLocation(serverName); + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = new HelperCreate(); + cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation); + cliCreateServer.setNoWeb(true); + cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE); + cliCreateServer.addArgs("--queues", QUEUE_NAME); + cliCreateServer.setPortOffset(porOffset); + if (replicated) { + cliCreateServer.setReplicated(true); + cliCreateServer.setStaticCluster(clusterStatic); + cliCreateServer.setClustered(true); + } else { + cliCreateServer.setClustered(false); + } + + cliCreateServer.createServer(); + + Properties brokerProperties = new Properties(); + brokerProperties.put("messageExpiryScanPeriod", "1000"); + brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI); + brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000"); + brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString()); + brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false"); + brokerProperties.put("largeMessageSync", "false"); + + brokerProperties.put("addressSettings.#.maxSizeMessages", "50000"); + brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000"); + brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1"); + brokerProperties.put("addressSettings.#.prefetchPageMessages", "500"); + // if we don't use pageTransactions we may eventually get a few duplicates + brokerProperties.put("mirrorPageTransaction", "true"); + + brokerProperties.put("mirrorAckManagerQueueAttempts", "2"); + brokerProperties.put("mirrorAckManagerPageAttempts", "500000"); + brokerProperties.put("mirrorAckManagerRetryDelay", "1000"); + + File brokerPropertiesFile = new File(serverLocation, "broker.properties"); + saveProperties(brokerProperties, brokerPropertiesFile); + + replaceLogs(serverLocation); + + } + + private static void replaceLogs(File serverLocation) throws Exception { + File log4j = new File(serverLocation, "/etc/log4j2.properties"); + assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", + "logger.artemis_utils.level=INFO\n" + "\n" + + "logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n" + + "logger.endpoint.level=DEBUG\n" + + "logger.ackmanager.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n" + + "logger.ackmanager.level=TRACE\n" + + + "logger.mirrorTarget.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget\n" + + "logger.mirrorTarget.level=TRACE\n" + + + "appender.console.filter.threshold.type = ThresholdFilter\n" + + "appender.console.filter.threshold.level = trace")); + } + + private static void createMirroredBackupServer(String serverName, + int porOffset, + String clusterStatic, + String mirrorURI) throws Exception { + File serverLocation = getFileServerLocation(serverName); + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = new HelperCreate(); + cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation); + cliCreateServer.setMessageLoadBalancing("ON_DEMAND"); + cliCreateServer.setNoWeb(true); + cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE); + cliCreateServer.setPortOffset(porOffset); + cliCreateServer.setClustered(true); + cliCreateServer.setReplicated(true); + cliCreateServer.setBackup(true); + cliCreateServer.setStaticCluster(clusterStatic); + cliCreateServer.createServer(); + + Properties brokerProperties = new Properties(); + brokerProperties.put("messageExpiryScanPeriod", "1000"); + brokerProperties.put("AMQPConnections.mirror.uri", mirrorURI); + brokerProperties.put("AMQPConnections.mirror.retryInterval", "1000"); + brokerProperties.put("AMQPConnections.mirror.type", AMQPBrokerConnectionAddressType.MIRROR.toString()); + brokerProperties.put("AMQPConnections.mirror.connectionElements.mirror.sync", "false"); + brokerProperties.put("largeMessageSync", "false"); + + brokerProperties.put("addressSettings.#.maxSizeMessages", "1"); + brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000"); + brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1"); + brokerProperties.put("addressSettings.#.prefetchPageMessages", "500"); + + brokerProperties.put("mirrorAckManagerQueueAttempts", "200"); + brokerProperties.put("mirrorAckManagerPageAttempts", "200000"); + brokerProperties.put("mirrorAckManagerRetryDelay", "10"); + + // if we don't use pageTransactions we may eventually get a few duplicates + brokerProperties.put("mirrorPageTransaction", "true"); + File brokerPropertiesFile = new File(serverLocation, "broker.properties"); + saveProperties(brokerProperties, brokerPropertiesFile); + + File brokerXml = new File(serverLocation, "/etc/broker.xml"); + assertTrue(brokerXml.exists()); + // Adding redistribution delay to broker configuration + assertTrue(FileUtil.findReplace(brokerXml, "", "\n\n" + " 0 \n")); + assertTrue(FileUtil.findReplace(brokerXml, "10M", "100K")); + + replaceLogs(serverLocation); + } + + public static void createRealServers() throws Exception { + createMirroredServer(DC1_NODE, "mirror", uriWithAlternate(DC2_IP, DC2_BACKUP_IP), 0, true, uri(DC1_BACKUP_IP)); + createMirroredBackupServer(DC1_REPLICA_NODE, 1, uri(DC1_IP), uriWithAlternate(DC2_IP, DC2_BACKUP_IP)); + createMirroredServer(DC2_NODE, "mirror", uriWithAlternate(DC1_IP, DC1_BACKUP_IP), 2, true, uri(DC2_BACKUP_IP)); + createMirroredBackupServer(DC2_REPLICA_NODE, 3, uri(DC2_IP), uriWithAlternate(DC1_IP, DC1_BACKUP_IP)); + } + + @Test + public void testConsumersAttached() throws Exception { + createRealServers(); + + SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null, null); + SimpleManagement managementDC2 = new SimpleManagement(uri(DC2_IP), null, null); + + processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties")); + processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new File(getServerLocation(DC2_REPLICA_NODE), "broker.properties")); + + processDC1 = startServer(DC1_NODE, -1, -1, new File(getServerLocation(DC1_NODE), "broker.properties")); + processDC1_REPLICA = startServer(DC1_REPLICA_NODE, -1, -1, new File(getServerLocation(DC1_REPLICA_NODE), "broker.properties")); + + ServerUtil.waitForServerToStart(2, 10_000); + Wait.assertTrue(managementDC2::isReplicaSync); + + ServerUtil.waitForServerToStart(0, 10_000); + Wait.assertTrue(managementDC1::isReplicaSync); + + runAfter(() -> managementDC1.close()); + runAfter(() -> managementDC2.close()); + + ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", uri(DC1_IP)); + try (Connection connection = connectionFactoryDC1A.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + TextMessage message = session.createTextMessage("Simple message"); + message.setIntProperty("i", 1); + message.setBooleanProperty("large", false); + producer.send(message); + session.commit(); + } + + ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", uri(DC2_IP)); + try (Connection connectionDC2 = connectionFactoryDC2A.createConnection(); Connection connectionDC1 = connectionFactoryDC1A.createConnection()) { + connectionDC2.start(); + connectionDC1.start(); + + // we will receive the message and hold it... + Session sessionDC2 = connectionDC2.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = sessionDC2.createQueue(QUEUE_NAME); + MessageConsumer consumerDC2 = sessionDC2.createConsumer(queue); + assertNotNull(consumerDC2.receive(5000)); + + Session sessionDC1 = connectionDC1.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumerDC1 = sessionDC1.createConsumer(queue); + assertNotNull(consumerDC1.receive(5000)); + sessionDC1.commit(); + + assertEquals(1, managementDC2.getMessageCountOnQueue(QUEUE_NAME)); + + // we roll it back and close the consumer, the message should now be back to be retried correctly + sessionDC2.rollback(); + consumerDC2.close(); + Wait.assertEquals(0, () -> managementDC2.getDeliveringCountOnQueue(QUEUE_NAME)); + } + } + +} \ No newline at end of file diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java index 33b1ffa64a..17ef670aab 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/ReplicatedBothNodesMirrorTest.java @@ -191,6 +191,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { brokerProperties.put("addressSettings.#.prefetchPageMessages", "500"); // if we don't use pageTransactions we may eventually get a few duplicates brokerProperties.put("mirrorPageTransaction", "true"); + brokerProperties.put("mirrorReplicaSync", "false"); File brokerPropertiesFile = new File(serverLocation, "broker.properties"); saveProperties(brokerProperties, brokerPropertiesFile); @@ -208,7 +209,16 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { private static void replaceLogs(File serverLocation) throws Exception { File log4j = new File(serverLocation, "/etc/log4j2.properties"); - assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + "\n" + "logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n" + "logger.endpoint.level=DEBUG\n" + "appender.console.filter.threshold.type = ThresholdFilter\n" + "appender.console.filter.threshold.level = info")); + assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", + "logger.artemis_utils.level=INFO\n" + "\n" + + "logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n" + + "logger.endpoint.level=INFO\n" + + "logger.mirrorTarget.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget\n" + + "logger.mirrorTarget.level=DEBUG\n" + + "appender.console.filter.threshold.type = ThresholdFilter\n" + + "appender.console.filter.threshold.level = info")); + + } private static void createMirroredBackupServer(String serverName, int porOffset, String clusterStatic, String mirrorURI) throws Exception { @@ -245,6 +255,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { brokerProperties.put("addressSettings.#.prefetchPageMessages", "500"); // if we don't use pageTransactions we may eventually get a few duplicates brokerProperties.put("mirrorPageTransaction", "true"); + brokerProperties.put("mirrorReplicaSync", "false"); File brokerPropertiesFile = new File(serverLocation, "broker.properties"); saveProperties(brokerProperties, brokerPropertiesFile); @@ -403,6 +414,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { } private static void sendMessages(String queueName) throws JMSException { + long start = System.currentTimeMillis(); ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", uri(DC1_IP)); try (Connection connection = connectionFactoryDC1A.createConnection()) { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); @@ -415,6 +427,10 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase { if (i > 0 && i % SEND_COMMIT == 0) { logger.info("Sent {} messages on {}", i, queueName); session.commit(); + + long timePassed = System.currentTimeMillis() - start; + double secondsPassed = timePassed / 1000f; + logger.info("sent {} messages, msgs/second = {}", i, (i / secondsPassed)); } }