ARTEMIS-5001 Relax consistency requirement on OperationContext for Mirror send operations.

Send operations should ignore replication, while the ack of the message should wait a round trip in replication.
That will allow us to ack the message faster and still have consistency with its replica.
This commit is contained in:
Clebert Suconic 2024-08-26 12:52:53 -04:00 committed by clebertsuconic
parent 2b9b81142b
commit ffafbf350a
22 changed files with 666 additions and 123 deletions

View File

@ -710,8 +710,8 @@ public final class ActiveMQDefaultConfiguration {
// These properties used to defined with this prefix.
// I'm keeping the older property name in an attempt to guarantee compatibility
private static final String FORMER_ACK_RETRY_CLASS_NAME = "org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckRetry";
private static final int DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MIN_QUEUE_ATTEMPTS", "5"));;
private static final int DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MAX_PAGE_ATTEMPT", "2"));;
private static final int DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MIN_QUEUE_ATTEMPTS", "5"));;
private static final int DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MAX_PAGE_ATTEMPT", "2"));;
private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".RETRY_DELAY", "100"));;
@ -1953,19 +1953,19 @@ public final class ActiveMQDefaultConfiguration {
/** This configures the Mirror Ack Manager number of attempts on queues before trying page acks.
* It is not intended to be configured through the XML.
* The default value here is 5. */
public static int getMirrorAckManagerMinQueueAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS;
public static int getMirrorAckManagerQueueAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS;
}
public static int getMirrorAckManagerMaxPageAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS;
public static int getMirrorAckManagerPageAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS;
}
public static int getMirrorAckManagerRetryDelay() {
return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY;
}
public static boolean getDefaultMirrorPageTransaction() {
public static boolean getMirrorPageTransaction() {
return DEFAULT_MIRROR_PAGE_TRANSACTION;
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io;
public enum OperationConsistencyLevel {
FULL, STORAGE, IGNORE_REPLICATION
}

View File

@ -232,6 +232,9 @@ public class JournalHashMap<K, V, C> implements Map<K, V> {
// callers must be synchronized
private void removed(MapRecord<K, V> record) {
if (logger.isTraceEnabled()) {
logger.trace("Removing record {}", record);
}
try {
journal.deleteMapRecord(record.id, false);
} catch (Exception e) {

View File

@ -702,8 +702,9 @@ public class AMQPSessionCallback implements SessionCallback {
storageManager.setContext(oldContext);
}
/** Set the proper operation context in the Thread Local.
* Return the old context*/
public OperationContext recoverContext() {
OperationContext oldContext = storageManager.getContext();
manager.getServer().getStorageManager().setContext(serverSession.getSessionContext());
return oldContext;

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
@ -261,10 +262,10 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
@Override
protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) {
recoverContext();
OperationContext oldContext = recoverContext();
incrementSettle();
logger.trace("{}::actualdelivery call for {}", server, message);
logger.trace("{}::actualDelivery call for {}", server, message);
setControllerInUse(this);
delivery.setContext(message);
@ -337,8 +338,10 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
} finally {
setControllerInUse(null);
if (messageAckOperation != null) {
server.getStorageManager().afterCompleteOperations(messageAckOperation);
server.getStorageManager().afterCompleteOperations(messageAckOperation, OperationConsistencyLevel.FULL);
}
OperationContextImpl.setContext(oldContext);
}
}
@ -475,7 +478,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
ackManager.ack(nodeID, targetQueue, messageID, reason, true);
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, OperationConsistencyLevel.FULL);
}
/**
@ -536,8 +539,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
message.setAddress(internalAddress);
}
// notice that MirrorTransaction is overriding getRequiredConsistency that is being set to ignore Replication.
// that means in case the target server is using replication, we will not wait for a roundtrip before the message is sent
// however we will wait the roundtrip before acking the message
// This is to alleviate a situation where messages would take too long to be delivered and be ready for ack
final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager()).setAllowPageTransaction(configuration.isMirrorPageTransaction()).setAsync(true);
transaction.addOperation(messageCompletionAck.tx);
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);
@ -550,6 +556,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
// We use this as part of a transaction because of the duplicate detection cache that needs to be done atomically
transaction.commit();
server.getStorageManager().afterCompleteOperations(messageCompletionAck, OperationConsistencyLevel.FULL);
flow();
// return true here will instruct the caller to ignore any references to messageCompletionAck

View File

@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -139,7 +140,14 @@ public class AckManager implements ActiveMQComponent {
// schedule a retry
if (!sortRetries().isEmpty()) {
scheduledComponent.delay();
ActiveMQScheduledComponent scheduleComponentReference = scheduledComponent;
if (scheduleComponentReference != null) {
try {
scheduleComponentReference.delay();
} catch (RejectedExecutionException ree) {
logger.debug("AckManager could not schedule a new retry due to the executor being shutdown {}", ree.getMessage(), ree);
}
}
}
}
@ -277,12 +285,12 @@ public class AckManager implements ActiveMQComponent {
if (retry.getQueueAttempts() >= configuration.getMirrorAckManagerQueueAttempts()) {
if (retry.attemptedPage() >= configuration.getMirrorAckManagerPageAttempts()) {
if (logger.isDebugEnabled()) {
logger.debug("Retried {} {} times, giving up on the entry now", retry, retry.getPageAttempts());
logger.debug("Retried {} {} times, giving up on the entry now. Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
}
retries.remove(retry);
} else {
if (logger.isDebugEnabled()) {
logger.trace("Retry {} attempted {} times on paging", retry, retry.getPageAttempts());
logger.debug("Retry {} attempted {} times on paging, Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.util.List;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
@ -26,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
/** MirrorTransaction disable some syncs in storage, and plays with OperationConsistencyLevel to relax some of the syncs required for Mirroring. */
public class MirrorTransaction extends TransactionImpl {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -60,4 +62,9 @@ public class MirrorTransaction extends TransactionImpl {
this.allowPageTransaction = allowPageTransaction;
return this;
}
@Override
protected OperationConsistencyLevel getRequiredConsistency() {
return OperationConsistencyLevel.IGNORE_REPLICATION;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@ -91,8 +92,10 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
return protonSession;
}
protected void recoverContext() {
sessionSPI.recoverContext();
/** Set the proper operation context in the Thread Local.
* Return the old context*/
protected OperationContext recoverContext() {
return sessionSPI.recoverContext();
}
protected void closeCurrentReader() {

View File

@ -440,13 +440,13 @@ public class ConfigurationImpl implements Configuration, Serializable {
private boolean managementMessagesRbac = ActiveMQDefaultConfiguration.getManagementMessagesRbac();
private int mirrorAckManagerMinQueueAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMinQueueAttempts();
private int mirrorAckManagerQueueAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerQueueAttempts();
private int mirrorAckManagerMaxPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMaxPageAttempts();
private int mirrorAckManagerPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerPageAttempts();
private int mirrorAckManagerRetryDelay = ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay();
private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getDefaultMirrorPageTransaction();
private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getMirrorPageTransaction();
/**
@ -941,6 +941,9 @@ public class ConfigurationImpl implements Configuration, Serializable {
// Identify the property name and value(s) to be assigned
final String name = entry.getKey();
try {
if (logger.isDebugEnabled()) {
logger.debug("set property target={}, name = {}, value = {}", target.getClass(), name, entry.getValue());
}
// Perform the assignment for this property
beanUtils.setProperty(target, name, entry.getValue());
} catch (InvocationTargetException invocationTargetException) {
@ -3379,25 +3382,25 @@ public class ConfigurationImpl implements Configuration, Serializable {
@Override
public int getMirrorAckManagerQueueAttempts() {
return mirrorAckManagerMinQueueAttempts;
return mirrorAckManagerQueueAttempts;
}
@Override
public ConfigurationImpl setMirrorAckManagerQueueAttempts(int minQueueAttempts) {
logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}", minQueueAttempts);
this.mirrorAckManagerMinQueueAttempts = minQueueAttempts;
this.mirrorAckManagerQueueAttempts = minQueueAttempts;
return this;
}
@Override
public int getMirrorAckManagerPageAttempts() {
return this.mirrorAckManagerMaxPageAttempts;
return this.mirrorAckManagerPageAttempts;
}
@Override
public ConfigurationImpl setMirrorAckManagerPageAttempts(int maxPageAttempts) {
logger.debug("Setting mirrorAckManagerMaxPageAttempts = {}", maxPageAttempts);
this.mirrorAckManagerMaxPageAttempts = maxPageAttempts;
this.mirrorAckManagerPageAttempts = maxPageAttempts;
return this;
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.persistence;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.journal.IOCompletion;
/**
@ -29,13 +30,9 @@ public interface OperationContext extends IOCompletion {
/**
* Execute the task when all IO operations are complete,
* Or execute it immediately if nothing is pending.
*
* @param runnable the tas to be executed.
* @param storeOnly There are tasks that won't need to wait on replication or paging and will need to
* be completed as soon as the response from the journal is received. An example would be the
* DuplicateCache
* Notice it's possible to pass a consistencyLevel to what should be waited before completing the operation.
*/
void executeOnCompletion(IOCallback runnable, boolean storeOnly);
void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel consistencyLevel);
/**
* Execute the task when all IO operations are complete,

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
@ -135,6 +136,7 @@ public interface StorageManager extends MapStorageManager, IDGenerator, ActiveMQ
void afterCompleteOperations(IOCallback run);
void afterCompleteOperations(IOCallback run, OperationConsistencyLevel consistencyLevel);
/**
* This is similar to afterComplete, however this only cares about the journal part.
*/

View File

@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
@ -354,9 +355,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
getContext().executeOnCompletion(run);
}
@Override
public void afterCompleteOperations(final IOCallback run, OperationConsistencyLevel consistencyLevel) {
getContext().executeOnCompletion(run, consistencyLevel);
}
@Override
public void afterStoreOperations(IOCallback run) {
getContext().executeOnCompletion(run, true);
getContext().executeOnCompletion(run, OperationConsistencyLevel.STORAGE);
}
@Override
@ -2120,8 +2126,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
@Override
public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
executeOnCompletion(runnable);
public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel consistencyLevel) {
// There are no executeOnCompletion calls while using the DummyOperationContext
// However we keep the code here for correctness
runnable.done();
}
@Override

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.persistence.impl.journal;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.persistence.OperationContext;
public final class DummyOperationContext implements OperationContext {
@ -35,7 +36,7 @@ public final class DummyOperationContext implements OperationContext {
}
@Override
public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel consistencyLevel) {
// There are no executeOnCompletion calls while using the DummyOperationContext
// However we keep the code here for correctness
runnable.done();

View File

@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -75,13 +76,9 @@ public class OperationContextImpl implements OperationContext {
}
LinkedList<TaskHolder> tasks;
LinkedList<IgnoreReplicationTaskHolder> ignoreReplicationTasks;
LinkedList<StoreOnlyTaskHolder> storeOnlyTasks;
long minimalStore = Long.MAX_VALUE;
long minimalReplicated = Long.MAX_VALUE;
long minimalPage = Long.MAX_VALUE;
static final AtomicIntegerFieldUpdater<OperationContextImpl> EXECUTORS_PENDING_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(OperationContextImpl.class, "executorsPendingField");
@ -94,6 +91,29 @@ public class OperationContextImpl implements OperationContext {
static final AtomicLongFieldUpdater<OperationContextImpl> PAGE_LINEUP_UPDATER = AtomicLongFieldUpdater
.newUpdater(OperationContextImpl.class, "pageLineUpField");
public long getReplicationLineUpField() {
return replicationLineUpField;
}
public long getReplicated() {
return replicated;
}
public long getStoreLineUpField() {
return storeLineUpField;
}
public long getStored() {
return stored;
}
public long getPagedLinedUpField() {
return pageLineUpField;
}
public long getPaged() {
return paged;
}
volatile int executorsPendingField = 0;
volatile long storeLineUpField = 0;
@ -165,11 +185,11 @@ public class OperationContextImpl implements OperationContext {
@Override
public void executeOnCompletion(IOCallback runnable) {
executeOnCompletion(runnable, false);
executeOnCompletion(runnable, OperationConsistencyLevel.FULL);
}
@Override
public void executeOnCompletion(final IOCallback completion, final boolean storeOnly) {
public void executeOnCompletion(final IOCallback completion, final OperationConsistencyLevel consistencyLevel) {
boolean executeNow = false;
synchronized (this) {
@ -177,44 +197,64 @@ public class OperationContextImpl implements OperationContext {
final long storeLined = STORE_LINEUP_UPDATER.get(this);
final long pageLined = PAGE_LINEUP_UPDATER.get(this);
final long replicationLined = REPLICATION_LINEUP_UPDATER.get(this);
if (storeOnly) {
switch (consistencyLevel) {
case STORAGE:
if (storeOnlyTasks == null) {
storeOnlyTasks = new LinkedList<>();
}
} else {
if (tasks == null) {
tasks = new LinkedList<>();
minimalReplicated = replicationLined;
minimalStore = storeLined;
minimalPage = pageLined;
}
}
// On this case, we can just execute the context directly
if (replicationLined == replicated && storeLined == stored && pageLined == paged) {
// We want to avoid the executor if everything is complete...
// However, we can't execute the context if there are executions pending
// We need to use the executor on this case
if (EXECUTORS_PENDING_UPDATER.get(this) == 0) {
// No need to use an executor here or a context switch
// there are no actions pending.. hence we can just execute the task directly on the same thread
if (storeLined == stored) {
if (hasNoPendingExecution()) {
// setting executeNow = true will make the completion to be called within the same thread here
// without using an executor
executeNow = true;
} else {
execute(completion);
}
} else {
if (storeOnly) {
if (storeLined == stored && EXECUTORS_PENDING_UPDATER.get(this) == 0) {
executeNow = true;
} else {
assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined));
}
break;
case IGNORE_REPLICATION:
if (ignoreReplicationTasks == null) {
ignoreReplicationTasks = new LinkedList<>();
}
if (storeLined == stored && pageLined == paged) {
if (hasNoPendingExecution()) {
// setting executeNow = true will make the completion to be called within the same thread here
// without using an executor
executeNow = true;
} else {
execute(completion);
}
} else {
ignoreReplicationTasks.add(new IgnoreReplicationTaskHolder(completion, storeLined, pageLined));
}
break;
case FULL:
if (tasks == null) {
tasks = new LinkedList<>();
}
if (replicationLined == replicated && storeLined == stored && pageLined == paged) {
// We want to avoid the executor if everything is complete...
// However, we can't execute the context if there are executions pending
// We need to use the executor on this case
if (hasNoPendingExecution()) {
// setting executeNow = true will make the completion to be called within the same thread here
// without using an executor
executeNow = true;
} else {
execute(completion);
}
} else {
// ensure total ordering
assert validateTasksAdd(storeLined, replicationLined, pageLined);
tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined));
}
break;
}
}
}
@ -228,17 +268,8 @@ public class OperationContextImpl implements OperationContext {
}
private boolean validateTasksAdd(long storeLined, long replicationLined, long pageLined) {
if (tasks.isEmpty()) {
return true;
}
final TaskHolder holder = tasks.peekLast();
if (holder.storeLined > storeLined ||
holder.replicationLined > replicationLined ||
holder.pageLined > pageLined) {
return false;
}
return true;
private boolean hasNoPendingExecution() {
return EXECUTORS_PENDING_UPDATER.get(this) == 0;
}
@Override
@ -273,14 +304,13 @@ public class OperationContextImpl implements OperationContext {
}
}
private void checkCompleteContext() {
private void checkRegularCompletion() {
final LinkedList<TaskHolder> tasks = this.tasks;
assert tasks != null;
final int size = this.tasks.size();
if (size == 0) {
return;
}
assert size >= 1;
// no need to use an iterator here, we can save that cost
for (int i = 0; i < size; i++) {
final TaskHolder holder = tasks.peek();
@ -294,14 +324,37 @@ public class OperationContextImpl implements OperationContext {
}
}
private void checkIgnoreReplicationCompletion() {
final LinkedList<IgnoreReplicationTaskHolder> tasks = this.ignoreReplicationTasks;
assert tasks != null;
final int size = tasks.size();
if (size == 0) {
return;
}
for (int i = 0; i < size; i++) {
final IgnoreReplicationTaskHolder holder = tasks.peek();
if (stored < holder.storeLined || paged < holder.pageLined) {
// End of list here. No other task will be completed after this
return;
}
execute(holder.task);
final IgnoreReplicationTaskHolder removed = tasks.poll();
assert removed == holder;
}
}
private void checkTasks() {
if (storeOnlyTasks != null) {
if (storeOnlyTasks != null && !storeOnlyTasks.isEmpty()) {
checkStoreTasks();
}
if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage) {
checkCompleteContext();
if (tasks != null && !tasks.isEmpty()) {
checkRegularCompletion();
}
if (ignoreReplicationTasks != null && !ignoreReplicationTasks.isEmpty()) {
checkIgnoreReplicationCompletion();
}
}
@ -377,6 +430,30 @@ public class OperationContextImpl implements OperationContext {
}
}
static final class IgnoreReplicationTaskHolder {
@Override
public String toString() {
return "IgnoreReplicationTaskHolder [storeLined=" + storeLined +
", pageLined=" +
pageLined +
", task=" +
task +
"]";
}
long storeLined;
long pageLined;
final IOCallback task;
IgnoreReplicationTaskHolder(final IOCallback task, long storeLined, long pageLined) {
this.storeLined = storeLined;
this.pageLined = pageLined;
this.task = task;
}
}
/**
* This class has been created to both better capture the intention that the {@link IOCallback} is related to a
* store-only operation and to reduce the memory footprint for store-only cases, given that many fields of
@ -419,21 +496,17 @@ public class OperationContextImpl implements OperationContext {
@Override
public String toString() {
return "OperationContextImpl [" + hashCode() + "] [minimalStore=" + minimalStore +
return "OperationContextImpl@" + Integer.toHexString(System.identityHashCode(this)) + "[" +
", storeLineUp=" +
storeLineUpField +
", stored=" +
stored +
", minimalReplicated=" +
minimalReplicated +
", replicationLineUp=" +
replicationLineUpField +
", replicated=" +
replicated +
", paged=" +
paged +
", minimalPage=" +
minimalPage +
", pageLineUp=" +
pageLineUpField +
", errorCode=" +
@ -449,11 +522,9 @@ public class OperationContextImpl implements OperationContext {
public synchronized void reset() {
stored = 0;
storeLineUpField = 0;
minimalReplicated = 0;
replicated = 0;
replicationLineUpField = 0;
paged = 0;
minimalPage = 0;
pageLineUpField = 0;
errorCode = -1;
errorMessage = null;

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
@ -156,7 +157,7 @@ public class NullStorageManager implements StorageManager {
}
@Override
public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel consistencyLevel) {
runnable.done();
}
@ -425,6 +426,11 @@ public class NullStorageManager implements StorageManager {
run.done();
}
@Override
public void afterCompleteOperations(IOCallback run, OperationConsistencyLevel consistencyLevel) {
run.done();
}
@Override
public void afterStoreOperations(IOCallback run) {
run.done();

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQTransactionTimeoutException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
@ -257,6 +258,10 @@ public class TransactionImpl implements Transaction {
}
}
protected OperationConsistencyLevel getRequiredConsistency() {
return OperationConsistencyLevel.FULL;
}
@Override
public void commit(final boolean onePhase) throws Exception {
logger.trace("TransactionImpl::commit::{}", this);
@ -313,7 +318,7 @@ public class TransactionImpl implements Transaction {
public void done() {
afterCommit(operationsToComplete);
}
});
}, getRequiredConsistency());
final List<TransactionOperation> storeOperationsToComplete = this.storeOperations;
this.storeOperations = null;

View File

@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -31,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.tests.util.ServerTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Wait;
@ -119,7 +121,7 @@ public class OperationContextUnitTest extends ServerTestBase {
public void done() {
latch1.countDown();
}
}, true);
}, OperationConsistencyLevel.STORAGE);
impl.storeLineUp();
@ -133,7 +135,7 @@ public class OperationContextUnitTest extends ServerTestBase {
public void done() {
latch3.countDown();
}
}, true);
}, OperationConsistencyLevel.STORAGE);
impl.done();
@ -158,7 +160,7 @@ public class OperationContextUnitTest extends ServerTestBase {
public void done() {
latch2.countDown();
}
}, true);
}, OperationConsistencyLevel.STORAGE);
assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
@ -179,22 +181,22 @@ public class OperationContextUnitTest extends ServerTestBase {
@Test
public void testCompletionLateStoreOnly() throws Exception {
testCompletionLate(true);
testCompletionLate(OperationConsistencyLevel.STORAGE);
}
@Test
public void testCompletionLate() throws Exception {
testCompletionLate(false);
testCompletionLate(OperationConsistencyLevel.FULL);
}
private void testCompletionLate(boolean storeOnly) throws Exception {
private void testCompletionLate(OperationConsistencyLevel consistencyLevel) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
try {
OperationContextImpl impl = new OperationContextImpl(executor);
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
if (storeOnly) {
if (consistencyLevel == OperationConsistencyLevel.STORAGE) {
// if storeOnly, then the pageSyncLinup and replication lineup should not bother the results
impl.pageSyncLineUp();
impl.replicationLineUp();
@ -211,7 +213,7 @@ public class OperationContextUnitTest extends ServerTestBase {
public void done() {
latch1.countDown();
}
}, storeOnly);
}, consistencyLevel);
impl.storeLineUpField = 350000;
impl.stored = impl.storeLineUpField - 1;
@ -234,7 +236,7 @@ public class OperationContextUnitTest extends ServerTestBase {
public void done() {
latch2.countDown();
}
}, storeOnly);
}, consistencyLevel);
impl.done();
@ -292,6 +294,112 @@ public class OperationContextUnitTest extends ServerTestBase {
}
}
@Test
public void testIgnoreReplication() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
runAfter(executor::shutdownNow);
ConcurrentLinkedQueue<Long> ignoreReplicationCompletions = new ConcurrentLinkedQueue();
ConcurrentLinkedQueue<Long> regularCompletion = new ConcurrentLinkedQueue();
final int N = 500;
final OperationContextImpl impl = new OperationContextImpl(new OrderedExecutor(executor));
// pending work to queue completions till done
impl.storeLineUp();
impl.replicationLineUp();
for (long l = 0; l < N; l++) {
long finalL = l;
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
ignoreReplicationCompletions.add(finalL);
}
}, OperationConsistencyLevel.IGNORE_REPLICATION);
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
regularCompletion.add(finalL);
}
}, OperationConsistencyLevel.FULL);
}
flushExecutor(executor);
assertEquals(0, ignoreReplicationCompletions.size());
assertEquals(0, regularCompletion.size());
impl.done();
flushExecutor(executor);
assertEquals(N, ignoreReplicationCompletions.size());
assertEquals(0, regularCompletion.size());
impl.replicationDone();
flushExecutor(executor);
assertEquals(N, regularCompletion.size());
for (long i = 0; i < N; i++) {
assertEquals(i, (long) ignoreReplicationCompletions.poll(), "ordered");
assertEquals(i, (long) regularCompletion.poll(), "ordered");
}
}
private void flushExecutor(Executor executor) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
executor.execute(latch::countDown);
assertTrue(latch.await(10, TimeUnit.SECONDS));
}
@Test
public void testWaitOnReplication() throws Exception {
ExecutorService executor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
runAfter(executor::shutdownNow);
ConcurrentLinkedQueue<Long> completions = new ConcurrentLinkedQueue();
final int N = 500;
final OperationContextImpl impl = new OperationContextImpl(new OrderedExecutor(executor));
// pending work to queue completions till done
impl.storeLineUp();
impl.replicationLineUp();
for (long l = 0; l < N; l++) {
long finalL = l;
impl.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
}
@Override
public void done() {
completions.add(finalL);
}
});
}
impl.done();
flushExecutor(executor);
assertEquals(0, completions.size());
impl.replicationDone();
flushExecutor(executor);
Wait.assertEquals(N, ()-> completions.size(), 5000, 100);
for (long i = 0; i < N; i++) {
assertEquals(i, (long) completions.poll(), "ordered");
}
}
@Test
public void testErrorNotLostOnPageSyncError() throws Exception {
@ -317,7 +425,7 @@ public class OperationContextUnitTest extends ServerTestBase {
}
try {
final int numJobs = 10000;
final int numJobs = 1000;
final CountDownLatch errorsOnLateRegister = new CountDownLatch(numJobs);
for (int i = 0; i < numJobs; i++) {
@ -342,14 +450,7 @@ public class OperationContextUnitTest extends ServerTestBase {
done.await();
}
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return errorsOnLateRegister.await(1, TimeUnit.SECONDS);
}
}));
assertTrue(errorsOnLateRegister.await(10, TimeUnit.SECONDS));
} finally {
executor.shutdown();

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
@ -431,6 +432,11 @@ public class TransactionImplTest extends ServerTestBase {
run.done();
}
@Override
public void afterCompleteOperations(IOCallback run, OperationConsistencyLevel consistencyLevel) {
run.done();
}
@Override
public void afterStoreOperations(IOCallback run) {
run.done();

View File

@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@ -6636,7 +6637,7 @@ public class PagingTest extends ParameterDBTestBase {
}
@Override
public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
public void executeOnCompletion(IOCallback runnable, OperationConsistencyLevel consistencyLevel) {
runnable.done();
}
}

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
@ -389,6 +390,11 @@ public class SendAckFailTest extends SpawnedTestBase {
manager.afterCompleteOperations(run);
}
@Override
public void afterCompleteOperations(IOCallback run, OperationConsistencyLevel level) {
manager.afterCompleteOperations(run);
}
@Override
public void afterStoreOperations(IOCallback run) {
manager.afterStoreOperations(run);

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
@ -95,7 +96,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
public void onError(int errorCode, String errorMessage) {
}
}, true);
}, OperationConsistencyLevel.STORAGE);
assertTrue(latch.await(1, TimeUnit.MINUTES));

View File

@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.brokerConnection;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** This test will keep a consumer active on both nodes. Up to the point an ack would give up.
* It has configured a massive number of retries, so even after keeping the consumer on for some time should not make the ack retry to go away.
* as soon as the consumer gives up the message the retry should succeed. */
public class MirrorInfiniteRetryReplicaTest extends SmokeTestBase {
private static final String QUEUE_NAME = "MirrorInfiniteRetryReplicaTestQueue";
public static final String DC1_NODE = "MirrorInfiniteRetryReplicaTest/DC1";
public static final String DC2_NODE = "MirrorInfiniteRetryReplicaTest/DC2";
public static final String DC2_REPLICA_NODE = "MirrorInfiniteRetryReplicaTest/DC2_REPLICA";
public static final String DC1_REPLICA_NODE = "MirrorInfiniteRetryReplicaTest/DC1_REPLICA";
volatile Process processDC1;
volatile Process processDC2;
volatile Process processDC1_REPLICA;
volatile Process processDC2_REPLICA;
// change this to true to have the server producing more detailed logs
private static final boolean TRACE_LOGS = false;
@AfterEach
public void destroyServers() throws Exception {
if (processDC2_REPLICA != null) {
processDC2_REPLICA.destroyForcibly();
processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
processDC2_REPLICA = null;
}
if (processDC1_REPLICA != null) {
processDC1_REPLICA.destroyForcibly();
processDC1_REPLICA.waitFor(1, TimeUnit.MINUTES);
processDC1_REPLICA = null;
}
if (processDC1 != null) {
processDC1.destroyForcibly();
processDC1.waitFor(1, TimeUnit.MINUTES);
processDC1 = null;
}
if (processDC2 != null) {
processDC2.destroyForcibly();
processDC2.waitFor(1, TimeUnit.MINUTES);
processDC2 = null;
}
}
private static final String DC1_IP = "localhost:61616";
private static final String DC1_BACKUP_IP = "localhost:61617";
private static final String DC2_IP = "localhost:61618";
private static final String DC2_BACKUP_IP = "localhost:61619";
private static String uri(String ip) {
return "tcp://" + ip;
}
private static String uriWithAlternate(String ip, String alternate) {
return "tcp://" + ip + "#tcp://" + alternate;
}
private static void createMirroredServer(String serverName,
String connectionName,
String mirrorURI,
int portOffset,
boolean replicated,
String clusterStatic) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
cliCreateServer.setNoWeb(true);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE);
cliCreateServer.addArgs("--queues", QUEUE_NAME);
cliCreateServer.setPortOffset(portOffset);
if (replicated) {
cliCreateServer.setReplicated(true);
cliCreateServer.setStaticCluster(clusterStatic);
cliCreateServer.setClustered(true);
} else {
cliCreateServer.setClustered(false);
}
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("messageExpiryScanPeriod", "1000");
brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("addressSettings.#.maxSizeMessages", "50000");
brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
brokerProperties.put("mirrorAckManagerQueueAttempts", "1000000"); // massive amount of retries, it should keep retrying even if there is a consumer holding a message
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
if (TRACE_LOGS) {
replaceLogs(serverLocation);
}
}
private static void replaceLogs(File serverLocation) throws Exception {
File log4j = new File(serverLocation, "/etc/log4j2.properties");
assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO",
"logger.artemis_utils.level=INFO\n" + "\n" +
"logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n" +
"logger.endpoint.level=DEBUG\n" +
"logger.ackmanager.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n" +
"logger.ackmanager.level=INFO\n" +
"logger.mirrorTarget.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget\n" +
"logger.mirrorTarget.level=INFO\n" +
"appender.console.filter.threshold.type = ThresholdFilter\n" +
"appender.console.filter.threshold.level = trace"));
}
private static void createMirroredBackupServer(String serverName,
int portOffset,
String clusterStatic,
String mirrorURI) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE);
cliCreateServer.setPortOffset(portOffset);
cliCreateServer.setClustered(true);
cliCreateServer.setReplicated(true);
cliCreateServer.setBackup(true);
cliCreateServer.setStaticCluster(clusterStatic);
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("messageExpiryScanPeriod", "1000");
brokerProperties.put("AMQPConnections.mirror.uri", mirrorURI);
brokerProperties.put("AMQPConnections.mirror.retryInterval", "1000");
brokerProperties.put("AMQPConnections.mirror.type", AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections.mirror.connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
brokerProperties.put("mirrorAckManagerQueueAttempts", "1000000"); // massive amount of retries, it should keep retrying even if there is a consumer holding a message
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
File brokerXml = new File(serverLocation, "/etc/broker.xml");
assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
assertTrue(FileUtil.findReplace(brokerXml, "<page-size-bytes>10M</page-size-bytes>", "<page-size-bytes>100K</page-size-bytes>"));
if (TRACE_LOGS) {
replaceLogs(serverLocation);
}
}
public static void createRealServers() throws Exception {
createMirroredServer(DC1_NODE, "mirror", uriWithAlternate(DC2_IP, DC2_BACKUP_IP), 0, true, uri(DC1_BACKUP_IP));
createMirroredBackupServer(DC1_REPLICA_NODE, 1, uri(DC1_IP), uriWithAlternate(DC2_IP, DC2_BACKUP_IP));
createMirroredServer(DC2_NODE, "mirror", uriWithAlternate(DC1_IP, DC1_BACKUP_IP), 2, true, uri(DC2_BACKUP_IP));
createMirroredBackupServer(DC2_REPLICA_NODE, 3, uri(DC2_IP), uriWithAlternate(DC1_IP, DC1_BACKUP_IP));
}
@Test
public void testConsumersAttached() throws Exception {
createRealServers();
SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null, null);
SimpleManagement managementDC2 = new SimpleManagement(uri(DC2_IP), null, null);
processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties"));
processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new File(getServerLocation(DC2_REPLICA_NODE), "broker.properties"));
processDC1 = startServer(DC1_NODE, -1, -1, new File(getServerLocation(DC1_NODE), "broker.properties"));
processDC1_REPLICA = startServer(DC1_REPLICA_NODE, -1, -1, new File(getServerLocation(DC1_REPLICA_NODE), "broker.properties"));
ServerUtil.waitForServerToStart(2, 10_000);
Wait.assertTrue(managementDC2::isReplicaSync);
ServerUtil.waitForServerToStart(0, 10_000);
Wait.assertTrue(managementDC1::isReplicaSync);
runAfter(() -> managementDC1.close());
runAfter(() -> managementDC2.close());
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", uri(DC1_IP));
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
TextMessage message = session.createTextMessage("Simple message");
message.setIntProperty("i", 1);
message.setBooleanProperty("large", false);
producer.send(message);
session.commit();
}
ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", uri(DC2_IP));
try (Connection connectionDC2 = connectionFactoryDC2A.createConnection(); Connection connectionDC1 = connectionFactoryDC1A.createConnection()) {
connectionDC2.start();
connectionDC1.start();
// we will receive the message and hold it...
Session sessionDC2 = connectionDC2.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = sessionDC2.createQueue(QUEUE_NAME);
MessageConsumer consumerDC2 = sessionDC2.createConsumer(queue);
assertNotNull(consumerDC2.receive(5000));
Session sessionDC1 = connectionDC1.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumerDC1 = sessionDC1.createConsumer(queue);
assertNotNull(consumerDC1.receive(5000));
sessionDC1.commit();
assertEquals(1, managementDC2.getMessageCountOnQueue(QUEUE_NAME));
// we roll it back and close the consumer, the message should now be back to be retried correctly
sessionDC2.rollback();
consumerDC2.close();
Wait.assertEquals(0, () -> managementDC2.getDeliveringCountOnQueue(QUEUE_NAME));
Wait.assertEquals(0, () -> managementDC2.getMessageCountOnQueue(QUEUE_NAME));
}
}
}