ARTEMIS-5001 Option to relax sync on journal replication for Mirroring

Say you use Mirroring and journal replication combined.

The target will wait a round trip on replica before sends are done.
It is possible to ignore that rountrip now with an option added into Configuration#mirrorReplicaSync
This commit is contained in:
Clebert Suconic 2024-08-19 20:55:02 -04:00 committed by clebertsuconic
parent 534526511f
commit d41f01a5aa
20 changed files with 583 additions and 36 deletions

View File

@ -710,13 +710,15 @@ public final class ActiveMQDefaultConfiguration {
// These properties used to defined with this prefix.
// I'm keeping the older property name in an attempt to guarantee compatibility
private static final String FORMER_ACK_RETRY_CLASS_NAME = "org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckRetry";
private static final int DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MIN_QUEUE_ATTEMPTS", "5"));;
private static final int DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MAX_PAGE_ATTEMPT", "2"));;
private static final int DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MIN_QUEUE_ATTEMPTS", "5"));;
private static final int DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MAX_PAGE_ATTEMPT", "2"));;
private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".RETRY_DELAY", "100"));;
private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
private static final boolean DEFAULT_MIRROR_REPLICA_SYNC = true;
/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
@ -1953,19 +1955,23 @@ public final class ActiveMQDefaultConfiguration {
/** This configures the Mirror Ack Manager number of attempts on queues before trying page acks.
* It is not intended to be configured through the XML.
* The default value here is 5. */
public static int getMirrorAckManagerMinQueueAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS;
public static int getMirrorAckManagerQueueAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS;
}
public static int getMirrorAckManagerMaxPageAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS;
public static int getMirrorAckManagerPageAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS;
}
public static int getMirrorAckManagerRetryDelay() {
return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY;
}
public static boolean getDefaultMirrorPageTransaction() {
public static boolean getMirrorReplicaSync() {
return DEFAULT_MIRROR_REPLICA_SYNC;
}
public static boolean getMirrorPageTransaction() {
return DEFAULT_MIRROR_PAGE_TRANSACTION;
}

View File

@ -233,6 +233,9 @@ public class JournalHashMap<K, V, C> implements Map<K, V> {
// callers must be synchronized
private void removed(MapRecord<K, V> record) {
if (logger.isTraceEnabled()) {
logger.info("Removing record {}", record, new Exception("trace"));
}
try {
journal.appendDeleteRecord(record.id, false);
} catch (Exception e) {

View File

@ -702,8 +702,9 @@ public class AMQPSessionCallback implements SessionCallback {
storageManager.setContext(oldContext);
}
/** Set the proper operation context in the Thread Local.
* Return the old context*/
public OperationContext recoverContext() {
OperationContext oldContext = storageManager.getContext();
manager.getServer().getStorageManager().setContext(serverSession.getSessionContext());
return oldContext;

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
@ -63,6 +64,8 @@ import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS;
@ -95,6 +98,22 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
return CONTROLLER_THREAD_LOCAL.get();
}
/** The rate in milliseconds that we will print OperationContext debug information on the mirror target */
private static final int DEBUG_CONTEXT_PERIOD;
private ScheduledFuture<?> scheduledRateDebugFuture = null;
static {
int period;
try {
period = Integer.parseInt(System.getProperty(AMQPMirrorControllerTarget.class.getName() + ".DEBUG_CONTEXT_PERIOD", "5000"));
} catch (Throwable e) {
logger.debug(e.getMessage(), e);
period = 0;
}
DEBUG_CONTEXT_PERIOD = period;
}
/**
* Objects of this class can be used by either transaction or by OperationContext.
* It is important that when you're using the transactions you clear any references to
@ -201,12 +220,23 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
creditRunnable.run();
}
@Override
protected OperationContext recoverContext() {
OperationContext oldContext = super.recoverContext();
OperationContextImpl.getContext().setSyncReplication(configuration.isMirrorReplicaSync());
return oldContext;
}
@Override
protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) {
recoverContext();
OperationContext oldContext = recoverContext();
scheduleRateDebug();
incrementSettle();
logger.trace("{}::actualdelivery call for {}", server, message);
logger.trace("{}::actualDelivery call for {}", server, message);
setControllerInUse(this);
delivery.setContext(message);
@ -281,6 +311,21 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
if (messageAckOperation != null) {
server.getStorageManager().afterCompleteOperations(messageAckOperation);
}
OperationContextImpl.setContext(oldContext);
}
}
private void scheduleRateDebug() {
if (logger.isDebugEnabled()) { // no need to schedule rate debug if no debug allowed
if (DEBUG_CONTEXT_PERIOD > 0 && scheduledRateDebugFuture == null) {
OperationContextImpl context = (OperationContextImpl) OperationContextImpl.getContext();
scheduledRateDebugFuture = server.getScheduledPool().scheduleAtFixedRate(() -> {
logger.debug(">>> OperationContext rate information: synReplica={}, replicationLineup = {}. replicationDone = {}, pending replica (back pressure) = {}, storeLineUp = {}, storeDone = {}, pageLineUp = {}, paged = {}", configuration.isMirrorReplicaSync(), context.getReplicationLineUpField(), context.getReplicated(), (context.getReplicationLineUpField() - context.getReplicated()), context.getStoreLineUpField(), context.getStored(), context.getPagedLinedUpField(), context.getPaged());
}, DEBUG_CONTEXT_PERIOD, DEBUG_CONTEXT_PERIOD, TimeUnit.MILLISECONDS);
}
} else {
cancelRateDebug();
}
}
@ -497,6 +542,23 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
return true;
}
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
super.close(remoteLinkClose);
cancelRateDebug();
}
private void cancelRateDebug() {
if (scheduledRateDebugFuture != null) {
try {
scheduledRateDebugFuture.cancel(true);
} catch (Throwable e) {
logger.debug("error on cancelRateDebug", e);
}
scheduledRateDebugFuture = null;
}
}
/** When the source mirror receives messages from a cluster member of his own, it should then fill targetQueues so we could play the same semantic the source applied on its routing */
private void targetQueuesRouting(final Message message,
final RoutingContext context,

View File

@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
@ -71,7 +72,7 @@ public class AckManager implements ActiveMQComponent {
final ReferenceIDSupplier referenceIDSupplier;
final IOCriticalErrorListener ioCriticalErrorListener;
volatile MultiStepProgress progress;
ActiveMQScheduledComponent scheduledComponent;
volatile ActiveMQScheduledComponent scheduledComponent;
public AckManager(ActiveMQServer server) {
this.server = server;
@ -137,7 +138,14 @@ public class AckManager implements ActiveMQComponent {
// schedule a retry
if (!sortRetries().isEmpty()) {
scheduledComponent.delay();
ActiveMQScheduledComponent scheduleComponentReference = scheduledComponent;
if (scheduleComponentReference != null) {
try {
scheduleComponentReference.delay();
} catch (RejectedExecutionException thatsOK) {
logger.debug(thatsOK.getMessage(), thatsOK);
}
}
}
}
@ -260,12 +268,12 @@ public class AckManager implements ActiveMQComponent {
if (retry.getQueueAttempts() >= configuration.getMirrorAckManagerQueueAttempts()) {
if (retry.attemptedPage() >= configuration.getMirrorAckManagerPageAttempts()) {
if (logger.isDebugEnabled()) {
logger.debug("Retried {} {} times, giving up on the entry now", retry, retry.getPageAttempts());
logger.debug("Retried {} {} times, giving up on the entry now. Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
}
retries.remove(retry);
} else {
if (logger.isDebugEnabled()) {
logger.trace("Retry {} attempted {} times on paging", retry, retry.getPageAttempts());
logger.trace("Retry {} attempted {} times on paging, Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@ -91,8 +92,8 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
return protonSession;
}
protected void recoverContext() {
sessionSPI.recoverContext();
protected OperationContext recoverContext() {
return sessionSPI.recoverContext();
}
protected void closeCurrentReader() {

View File

@ -1535,4 +1535,10 @@ public interface Configuration {
boolean isMirrorPageTransaction();
Configuration setMirrorPageTransaction(boolean ignorePageTransactions);
/** It is possible to relax data synchronization requirements on a target mirror configured to use journal replication.
* If this is set to false Mirror Operations will not wait a response from replication before completing any operations */
boolean isMirrorReplicaSync();
Configuration setMirrorReplicaSync(boolean replicaSync);
}

View File

@ -440,13 +440,15 @@ public class ConfigurationImpl implements Configuration, Serializable {
private boolean managementMessagesRbac = ActiveMQDefaultConfiguration.getManagementMessagesRbac();
private int mirrorAckManagerMinQueueAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMinQueueAttempts();
private int mirrorAckManagerQueueAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerQueueAttempts();
private int mirrorAckManagerMaxPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMaxPageAttempts();
private int mirrorAckManagerPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerPageAttempts();
private int mirrorAckManagerRetryDelay = ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay();
private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getDefaultMirrorPageTransaction();
private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getMirrorPageTransaction();
private boolean mirrorReplicaSync = ActiveMQDefaultConfiguration.getMirrorReplicaSync();
/**
@ -941,6 +943,9 @@ public class ConfigurationImpl implements Configuration, Serializable {
// Identify the property name and value(s) to be assigned
final String name = entry.getKey();
try {
if (logger.isDebugEnabled()) {
logger.debug("set property target={}, name = {}, value = {}", target.getClass(), name, entry.getValue());
}
// Perform the assignment for this property
beanUtils.setProperty(target, name, entry.getValue());
} catch (InvocationTargetException invocationTargetException) {
@ -3379,25 +3384,25 @@ public class ConfigurationImpl implements Configuration, Serializable {
@Override
public int getMirrorAckManagerQueueAttempts() {
return mirrorAckManagerMinQueueAttempts;
return mirrorAckManagerQueueAttempts;
}
@Override
public ConfigurationImpl setMirrorAckManagerQueueAttempts(int minQueueAttempts) {
logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}", minQueueAttempts);
this.mirrorAckManagerMinQueueAttempts = minQueueAttempts;
this.mirrorAckManagerQueueAttempts = minQueueAttempts;
return this;
}
@Override
public int getMirrorAckManagerPageAttempts() {
return this.mirrorAckManagerMaxPageAttempts;
return this.mirrorAckManagerPageAttempts;
}
@Override
public ConfigurationImpl setMirrorAckManagerPageAttempts(int maxPageAttempts) {
logger.debug("Setting mirrorAckManagerMaxPageAttempts = {}", maxPageAttempts);
this.mirrorAckManagerMaxPageAttempts = maxPageAttempts;
this.mirrorAckManagerPageAttempts = maxPageAttempts;
return this;
}
@ -3413,6 +3418,18 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public boolean isMirrorReplicaSync() {
return mirrorReplicaSync;
}
@Override
public ConfigurationImpl setMirrorReplicaSync(boolean replicaSync) {
logger.debug("setMirrorReplicaSync {}", replicaSync);
this.mirrorReplicaSync = replicaSync;
return this;
}
@Override
public boolean isMirrorPageTransaction() {
return mirrorPageTransaction;

View File

@ -389,6 +389,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String MIRROR_PAGE_TRANSACTION = "mirror-page-transaction";
private static final String MIRROR_REPLICA_SYNC = "mirror-replica-sync";
private static final String INITIAL_QUEUE_BUFFER_SIZE = "initial-queue-buffer-size";
private boolean validateAIO = false;
@ -862,6 +864,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setMirrorPageTransaction(getBoolean(e, MIRROR_PAGE_TRANSACTION, config.isMirrorPageTransaction()));
config.setMirrorReplicaSync(getBoolean(e, MIRROR_REPLICA_SYNC, config.isMirrorReplicaSync()));
config.setMirrorAckManagerPageAttempts(getInteger(e, MIRROR_ACK_MANAGER_PAGE_ATTEMPTS, config.getMirrorAckManagerPageAttempts(), GT_ZERO));
config.setMirrorAckManagerQueueAttempts(getInteger(e, MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS, config.getMirrorAckManagerQueueAttempts(), GT_ZERO));

View File

@ -37,6 +37,13 @@ public interface OperationContext extends IOCompletion {
*/
void executeOnCompletion(IOCallback runnable, boolean storeOnly);
default void setSyncReplication(boolean syncReplication) {
}
default boolean isSyncReplication() {
return true;
}
/**
* Execute the task when all IO operations are complete,
* Or execute it immediately if nothing is pending.

View File

@ -47,6 +47,19 @@ import org.apache.commons.collections.buffer.CircularFifoBuffer;
*/
public class OperationContextImpl implements OperationContext {
private boolean syncReplication = true;
@Override
public void setSyncReplication(boolean syncReplication) {
this.syncReplication = syncReplication;
}
@Override
public boolean isSyncReplication() {
return syncReplication;
}
private static final ThreadLocal<OperationContext> threadLocalContext = new ThreadLocal<>();
public static void clearContext() {
@ -94,6 +107,29 @@ public class OperationContextImpl implements OperationContext {
static final AtomicLongFieldUpdater<OperationContextImpl> PAGE_LINEUP_UPDATER = AtomicLongFieldUpdater
.newUpdater(OperationContextImpl.class, "pageLineUpField");
public long getReplicationLineUpField() {
return replicationLineUpField;
}
public long getReplicated() {
return replicated;
}
public long getStoreLineUpField() {
return storeLineUpField;
}
public long getStored() {
return stored;
}
public long getPagedLinedUpField() {
return pageLineUpField;
}
public long getPaged() {
return paged;
}
volatile int executorsPendingField = 0;
volatile long storeLineUpField = 0;
@ -284,7 +320,7 @@ public class OperationContextImpl implements OperationContext {
// no need to use an iterator here, we can save that cost
for (int i = 0; i < size; i++) {
final TaskHolder holder = tasks.peek();
if (stored < holder.storeLined || replicated < holder.replicationLined || paged < holder.pageLined) {
if (stored < holder.storeLined || syncReplication && replicated < holder.replicationLined || paged < holder.pageLined) {
// End of list here. No other task will be completed after this
return;
}
@ -300,7 +336,7 @@ public class OperationContextImpl implements OperationContext {
checkStoreTasks();
}
if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage) {
if (stored >= minimalStore && (!syncReplication || replicated >= minimalReplicated) && paged >= minimalPage) {
checkCompleteContext();
}
}

View File

@ -957,6 +957,16 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="mirror-replica-sync" type="xsd:boolean" maxOccurs="1" minOccurs="0" default="true">
<xsd:annotation>
<xsd:documentation>
If journal replication is used on a target mirror, it is possible to ignore replica waits for any mirror operation.
This is exposed as mirrorReplicaSync on broker properties.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="suppress-session-notifications" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -161,5 +161,7 @@ public class DefaultsFileConfigurationTest extends AbstractConfigurationTestBase
assertEquals(ActiveMQDefaultConfiguration.getDefaultLoggingMetrics(), conf.getMetricsConfiguration().isLogging());
assertEquals(ActiveMQDefaultConfiguration.getDefaultSecurityCacheMetrics(), conf.getMetricsConfiguration().isSecurityCaches());
assertEquals(ActiveMQDefaultConfiguration.getMirrorReplicaSync(), conf.isMirrorReplicaSync());
}
}

View File

@ -582,6 +582,7 @@ public class FileConfigurationTest extends AbstractConfigurationTestBase {
assertEquals(222, conf.getMirrorAckManagerPageAttempts());
assertEquals(333, conf.getMirrorAckManagerRetryDelay());
assertTrue(conf.isMirrorPageTransaction());
assertFalse(conf.isMirrorReplicaSync());
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());

View File

@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -292,6 +293,97 @@ public class OperationContextUnitTest extends ServerTestBase {
}
}
@Test
public void testIgnoreReplication() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
runAfter(executor::shutdownNow);
ConcurrentLinkedQueue<Long> completions = new ConcurrentLinkedQueue();
final int N = 500;
final OperationContextImpl impl = new OperationContextImpl(new OrderedExecutor(executor));
// pending work to queue completions till done
impl.storeLineUp();
impl.setSyncReplication(false);
impl.replicationLineUp();
for (long l = 0; l < N; l++) {
long finalL = l;
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
completions.add(finalL);
}
});
}
flushExecutor(executor);
assertEquals(0, completions.size());
impl.done();
flushExecutor(executor);
assertEquals(N, completions.size());
impl.replicationDone();
flushExecutor(executor);
for (long i = 0; i < N; i++) {
assertEquals(i, (long) completions.poll(), "ordered");
}
}
private void flushExecutor(Executor executor) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
executor.execute(latch::countDown);
assertTrue(latch.await(10, TimeUnit.SECONDS));
}
@Test
public void testWaitOnReplication() throws Exception {
ExecutorService executor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
runAfter(executor::shutdownNow);
ConcurrentLinkedQueue<Long> completions = new ConcurrentLinkedQueue();
final int N = 500;
final OperationContextImpl impl = new OperationContextImpl(new OrderedExecutor(executor));
// pending work to queue completions till done
impl.storeLineUp();
impl.replicationLineUp();
for (long l = 0; l < N; l++) {
long finalL = l;
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
completions.add(finalL);
}
});
}
impl.done();
flushExecutor(executor);
assertEquals(0, completions.size());
impl.replicationDone();
flushExecutor(executor);
Wait.assertEquals(N, ()-> completions.size(), 5000, 100);
for (long i = 0; i < N; i++) {
assertEquals(i, (long) completions.poll(), "ordered");
}
}
@Test
public void testErrorNotLostOnPageSyncError() throws Exception {
@ -317,7 +409,7 @@ public class OperationContextUnitTest extends ServerTestBase {
}
try {
final int numJobs = 10000;
final int numJobs = 1000;
final CountDownLatch errorsOnLateRegister = new CountDownLatch(numJobs);
for (int i = 0; i < numJobs; i++) {
@ -342,14 +434,7 @@ public class OperationContextUnitTest extends ServerTestBase {
done.await();
}
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return errorsOnLateRegister.await(1, TimeUnit.SECONDS);
}
}));
assertTrue(errorsOnLateRegister.await(10, TimeUnit.SECONDS));
} finally {
executor.shutdown();

View File

@ -557,6 +557,7 @@
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-page-transaction>true</mirror-page-transaction>
<mirror-replica-sync>false</mirror-replica-sync>
<security-settings>
<security-setting match="a1">

View File

@ -75,6 +75,7 @@
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-page-transaction>true</mirror-page-transaction>
<mirror-replica-sync>false</mirror-replica-sync>
<remoting-incoming-interceptors>
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1</class-name>

View File

@ -75,6 +75,7 @@
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-page-transaction>true</mirror-page-transaction>
<mirror-replica-sync>false</mirror-replica-sync>
<xi:include href="${xincludePath}/ConfigurationTest-xinclude-schema-config-remoting-incoming-interceptors.xml"/>

View File

@ -0,0 +1,279 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.brokerConnection;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase {
private static final String QUEUE_NAME = "MirrorInfiniteRetryReplicaTestQueue";
public static final String DC1_NODE = "AckLateRetrySoakTest/DC1";
public static final String DC2_NODE = "AckLateRetrySoakTest/DC2";
public static final String DC2_REPLICA_NODE = "AckLateRetrySoakTest/DC2_REPLICA";
public static final String DC1_REPLICA_NODE = "AckLateRetrySoakTest/DC1_REPLICA";
volatile Process processDC1;
volatile Process processDC2;
volatile Process processDC1_REPLICA;
volatile Process processDC2_REPLICA;
@AfterEach
public void destroyServers() throws Exception {
if (processDC2_REPLICA != null) {
processDC2_REPLICA.destroyForcibly();
processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
processDC2_REPLICA = null;
}
if (processDC1_REPLICA != null) {
processDC1_REPLICA.destroyForcibly();
processDC1_REPLICA.waitFor(1, TimeUnit.MINUTES);
processDC1_REPLICA = null;
}
if (processDC1 != null) {
processDC1.destroyForcibly();
processDC1.waitFor(1, TimeUnit.MINUTES);
processDC1 = null;
}
if (processDC2 != null) {
processDC2.destroyForcibly();
processDC2.waitFor(1, TimeUnit.MINUTES);
processDC2 = null;
}
}
private static final String DC1_IP = "localhost:61616";
private static final String DC1_BACKUP_IP = "localhost:61617";
private static final String DC2_IP = "localhost:61618";
private static final String DC2_BACKUP_IP = "localhost:61619";
private static String uri(String ip) {
return "tcp://" + ip;
}
private static String uriWithAlternate(String ip, String alternate) {
return "tcp://" + ip + "#tcp://" + alternate;
}
private static void createMirroredServer(String serverName,
String connectionName,
String mirrorURI,
int porOffset,
boolean replicated,
String clusterStatic) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
cliCreateServer.setNoWeb(true);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE);
cliCreateServer.addArgs("--queues", QUEUE_NAME);
cliCreateServer.setPortOffset(porOffset);
if (replicated) {
cliCreateServer.setReplicated(true);
cliCreateServer.setStaticCluster(clusterStatic);
cliCreateServer.setClustered(true);
} else {
cliCreateServer.setClustered(false);
}
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("messageExpiryScanPeriod", "1000");
brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("addressSettings.#.maxSizeMessages", "50000");
brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
brokerProperties.put("mirrorAckManagerQueueAttempts", "2");
brokerProperties.put("mirrorAckManagerPageAttempts", "500000");
brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
replaceLogs(serverLocation);
}
private static void replaceLogs(File serverLocation) throws Exception {
File log4j = new File(serverLocation, "/etc/log4j2.properties");
assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO",
"logger.artemis_utils.level=INFO\n" + "\n" +
"logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n" +
"logger.endpoint.level=DEBUG\n" +
"logger.ackmanager.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n" +
"logger.ackmanager.level=TRACE\n" +
"logger.mirrorTarget.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget\n" +
"logger.mirrorTarget.level=TRACE\n" +
"appender.console.filter.threshold.type = ThresholdFilter\n" +
"appender.console.filter.threshold.level = trace"));
}
private static void createMirroredBackupServer(String serverName,
int porOffset,
String clusterStatic,
String mirrorURI) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setNoWeb(true);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE);
cliCreateServer.setPortOffset(porOffset);
cliCreateServer.setClustered(true);
cliCreateServer.setReplicated(true);
cliCreateServer.setBackup(true);
cliCreateServer.setStaticCluster(clusterStatic);
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("messageExpiryScanPeriod", "1000");
brokerProperties.put("AMQPConnections.mirror.uri", mirrorURI);
brokerProperties.put("AMQPConnections.mirror.retryInterval", "1000");
brokerProperties.put("AMQPConnections.mirror.type", AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections.mirror.connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
brokerProperties.put("mirrorAckManagerQueueAttempts", "200");
brokerProperties.put("mirrorAckManagerPageAttempts", "200000");
brokerProperties.put("mirrorAckManagerRetryDelay", "10");
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
File brokerXml = new File(serverLocation, "/etc/broker.xml");
assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
assertTrue(FileUtil.findReplace(brokerXml, "<page-size-bytes>10M</page-size-bytes>", "<page-size-bytes>100K</page-size-bytes>"));
replaceLogs(serverLocation);
}
public static void createRealServers() throws Exception {
createMirroredServer(DC1_NODE, "mirror", uriWithAlternate(DC2_IP, DC2_BACKUP_IP), 0, true, uri(DC1_BACKUP_IP));
createMirroredBackupServer(DC1_REPLICA_NODE, 1, uri(DC1_IP), uriWithAlternate(DC2_IP, DC2_BACKUP_IP));
createMirroredServer(DC2_NODE, "mirror", uriWithAlternate(DC1_IP, DC1_BACKUP_IP), 2, true, uri(DC2_BACKUP_IP));
createMirroredBackupServer(DC2_REPLICA_NODE, 3, uri(DC2_IP), uriWithAlternate(DC1_IP, DC1_BACKUP_IP));
}
@Test
public void testConsumersAttached() throws Exception {
createRealServers();
SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null, null);
SimpleManagement managementDC2 = new SimpleManagement(uri(DC2_IP), null, null);
processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties"));
processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new File(getServerLocation(DC2_REPLICA_NODE), "broker.properties"));
processDC1 = startServer(DC1_NODE, -1, -1, new File(getServerLocation(DC1_NODE), "broker.properties"));
processDC1_REPLICA = startServer(DC1_REPLICA_NODE, -1, -1, new File(getServerLocation(DC1_REPLICA_NODE), "broker.properties"));
ServerUtil.waitForServerToStart(2, 10_000);
Wait.assertTrue(managementDC2::isReplicaSync);
ServerUtil.waitForServerToStart(0, 10_000);
Wait.assertTrue(managementDC1::isReplicaSync);
runAfter(() -> managementDC1.close());
runAfter(() -> managementDC2.close());
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", uri(DC1_IP));
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
TextMessage message = session.createTextMessage("Simple message");
message.setIntProperty("i", 1);
message.setBooleanProperty("large", false);
producer.send(message);
session.commit();
}
ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", uri(DC2_IP));
try (Connection connectionDC2 = connectionFactoryDC2A.createConnection(); Connection connectionDC1 = connectionFactoryDC1A.createConnection()) {
connectionDC2.start();
connectionDC1.start();
// we will receive the message and hold it...
Session sessionDC2 = connectionDC2.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = sessionDC2.createQueue(QUEUE_NAME);
MessageConsumer consumerDC2 = sessionDC2.createConsumer(queue);
assertNotNull(consumerDC2.receive(5000));
Session sessionDC1 = connectionDC1.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumerDC1 = sessionDC1.createConsumer(queue);
assertNotNull(consumerDC1.receive(5000));
sessionDC1.commit();
assertEquals(1, managementDC2.getMessageCountOnQueue(QUEUE_NAME));
// we roll it back and close the consumer, the message should now be back to be retried correctly
sessionDC2.rollback();
consumerDC2.close();
Wait.assertEquals(0, () -> managementDC2.getDeliveringCountOnQueue(QUEUE_NAME));
}
}
}

View File

@ -191,6 +191,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
brokerProperties.put("mirrorReplicaSync", "false");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
@ -208,7 +209,16 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
private static void replaceLogs(File serverLocation) throws Exception {
File log4j = new File(serverLocation, "/etc/log4j2.properties");
assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + "\n" + "logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n" + "logger.endpoint.level=DEBUG\n" + "appender.console.filter.threshold.type = ThresholdFilter\n" + "appender.console.filter.threshold.level = info"));
assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO",
"logger.artemis_utils.level=INFO\n" + "\n" +
"logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n" +
"logger.endpoint.level=INFO\n" +
"logger.mirrorTarget.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget\n" +
"logger.mirrorTarget.level=DEBUG\n" +
"appender.console.filter.threshold.type = ThresholdFilter\n" +
"appender.console.filter.threshold.level = info"));
}
private static void createMirroredBackupServer(String serverName, int porOffset, String clusterStatic, String mirrorURI) throws Exception {
@ -245,6 +255,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
brokerProperties.put("mirrorReplicaSync", "false");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
@ -403,6 +414,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
}
private static void sendMessages(String queueName) throws JMSException {
long start = System.currentTimeMillis();
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", uri(DC1_IP));
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@ -415,6 +427,10 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
if (i > 0 && i % SEND_COMMIT == 0) {
logger.info("Sent {} messages on {}", i, queueName);
session.commit();
long timePassed = System.currentTimeMillis() - start;
double secondsPassed = timePassed / 1000f;
logger.info("sent {} messages, msgs/second = {}", i, (i / secondsPassed));
}
}