ARTEMIS-4758 Hardening Mirroring

This is a list of improvements done as part of this commit / task:

* Page Transactions on mirror target are now optional.

If you had an interrupt mirror while the target destination was paging, duplicate detection would be ineffective unless you used paged transactions
Users can now configure the ack manager retries intervals.
Say you need some time to remove a consumer from a target mirror. The delivering references would prevent acks from happening. You can allow bigger retry intervals and number of retries by tinkiering with ack manager retry parameters.

* AckManager restarted independent of incoming acks

The ackManager was only restarted when new acks were coming in. If you stopped receiving acks on a target server and restarted that server with pending acks, those acks would never be exercised. The AckManager is now restarted as soon as the server is started.
This commit is contained in:
Clebert Suconic 2024-05-02 10:28:42 -04:00 committed by clebertsuconic
parent afd7951eb2
commit c523458a9a
24 changed files with 670 additions and 70 deletions

View File

@ -212,15 +212,20 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
public synchronized ActiveMQScheduledComponent setPeriod(long period) {
this.period = period;
restartIfNeeded();
if (this.period != period) {
this.period = period;
restartIfNeeded();
}
return this;
}
public synchronized ActiveMQScheduledComponent setPeriod(long period, TimeUnit unit) {
this.period = period;
this.timeUnit = unit;
restartIfNeeded();
if (unit == null) throw new NullPointerException("unit is required");
if (this.period != period || this.timeUnit != unit) {
this.period = period;
this.timeUnit = unit;
restartIfNeeded();
}
return this;
}

View File

@ -201,6 +201,52 @@ public class ActiveMQScheduledComponentTest {
}
}
@Test
public void testUpdatePeriod() throws Throwable {
final ReusableLatch latch = new ReusableLatch(1);
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(10, TimeUnit.MILLISECONDS, true) {
@Override
public void run() {
latch.countDown();
}
};
local.start();
try {
Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS));
latch.setCount(1);
local.delay();
Assert.assertTrue(latch.await(20, TimeUnit.MILLISECONDS));
latch.setCount(1);
Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS));
local.setPeriod(TimeUnit.HOURS.toMillis(1), TimeUnit.MILLISECONDS);
latch.setCount(1);
local.delay();
Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS));
local.setPeriod(1);
local.delay();
Assert.assertTrue(latch.await(20, TimeUnit.MILLISECONDS));
local.setPeriod(1, TimeUnit.SECONDS);
latch.setCount(1);
local.delay();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
} finally {
local.stop();
local.stop(); // calling stop again should not be an issue.
}
}
@Test
public void testUsingCustomInitialDelay() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);

View File

@ -698,6 +698,17 @@ public final class ActiveMQDefaultConfiguration {
private static final boolean DEFAULT_MANAGEMENT_MESSAGE_RBAC = false;
// These properties used to defined with this prefix.
// I'm keeping the older property name in an attempt to guarantee compatibility
private static final String FORMER_ACK_RETRY_CLASS_NAME = "org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckRetry";
private static final int DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MIN_QUEUE_ATTEMPTS", "5"));;
private static final int DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".MAX_PAGE_ATTEMPT", "2"));;
private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + ".RETRY_DELAY", "100"));;
private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
@ -1918,4 +1929,25 @@ public final class ActiveMQDefaultConfiguration {
public static boolean getManagementMessagesRbac() {
return DEFAULT_MANAGEMENT_MESSAGE_RBAC;
}
/** This configures the Mirror Ack Manager number of attempts on queues before trying page acks.
* It is not intended to be configured through the XML.
* The default value here is 5. */
public static int getMirrorAckManagerMinQueueAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS;
}
public static int getMirrorAckManagerMaxPageAttempts() {
return DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS;
}
public static int getMirrorAckManagerRetryDelay() {
return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY;
}
public static boolean getDefaultMirrorPageTransaction() {
return DEFAULT_MIRROR_PAGE_TRANSACTION;
}
}

View File

@ -37,6 +37,11 @@ import org.apache.activemq.artemis.core.persistence.Persister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* K = Key
* V = Value
* C = Context
* */
public class JournalHashMap<K, V, C> implements Map<K, V> {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

View File

@ -87,7 +87,8 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
@Override
public void loadProtocolServices(ActiveMQServer server, List<ActiveMQComponent> services) {
try {
AckManager ackManager = AckManagerProvider.getManager(server, false);
AckManager ackManager = AckManagerProvider.getManager(server);
services.add(ackManager);
server.registerRecordsLoader(ackManager::reload);
} catch (Exception e) {
logger.warn(e.getMessage(), e);

View File

@ -419,6 +419,8 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
snfQueue.deliverAsync();
}
private void syncDone(MessageReference reference) {
@ -516,6 +518,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
postACKInternalMessage(ref);
return;
}
snfQueue.deliverAsync();
}
@Override

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
@ -161,6 +162,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
final ActiveMQServer server;
final Configuration configuration;
DuplicateIDCache lruduplicateIDCache;
String lruDuplicateIDKey;
@ -183,6 +186,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
this.basicController = new BasicMirrorController(server);
this.basicController.setLink(receiver);
this.server = server;
this.configuration = server.getConfiguration();
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
mirrorContext = protonSession.getSessionSPI().getSessionContext();
}
@ -389,8 +393,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
if (logger.isTraceEnabled()) {
logger.trace("Server {} with queue = {} being acked for {} coming from {} targetQueue = {}",
server.getIdentity(), queue, messageID, messageID, targetQueue);
logger.trace("Server {} with queue = {} being acked for {} from {} targetQueue = {} reason = {}",
server.getIdentity(), queue, messageID, ackMessage, targetQueue, reason);
}
performAck(nodeID, targetQueue, messageID, ackMessage, reason);
@ -407,7 +411,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
if (ackManager == null) {
ackManager = AckManagerProvider.getManager(server, true);
ackManager = AckManagerProvider.getManager(server);
}
ackManager.ack(nodeID, targetQueue, messageID, reason, true);
@ -473,7 +477,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
message.setAddress(internalAddress);
}
final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager()).setAsync(true);
final TransactionImpl transaction = new MirrorTransaction(server.getStorageManager()).setAllowPageTransaction(configuration.isMirrorPageTransaction()).setAsync(true);
transaction.addOperation(messageCompletionAck.tx);
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);

View File

@ -29,6 +29,7 @@ import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -58,12 +59,6 @@ import org.slf4j.LoggerFactory;
public class AckManager implements ActiveMQComponent {
// we first retry on the queue a few times
private static final short MIN_QUEUE_ATTEMPTS = Short.parseShort(System.getProperty(AckRetry.class.getName() + ".MIN_QUEUE_ATTEMPTS", "5"));
private static final short MAX_PAGE_ATTEMPTS = Short.parseShort(System.getProperty(AckRetry.class.getName() + ".MAX_PAGE_ATTEMPT", "2"));
public static final int RETRY_DELAY = Integer.parseInt(System.getProperty(AckRetry.class.getName() + ".RETRY_DELAY", "100"));
private static DisabledAckMirrorController disabledAckMirrorController = new DisabledAckMirrorController();
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -72,6 +67,7 @@ public class AckManager implements ActiveMQComponent {
final LongSupplier sequenceGenerator;
final JournalHashMapProvider<AckRetry, AckRetry, Queue> journalHashMapProvider;
final ActiveMQServer server;
final Configuration configuration;
final ReferenceIDSupplier referenceIDSupplier;
final IOCriticalErrorListener ioCriticalErrorListener;
volatile MultiStepProgress progress;
@ -79,6 +75,7 @@ public class AckManager implements ActiveMQComponent {
public AckManager(ActiveMQServer server) {
this.server = server;
this.configuration = server.getConfiguration();
this.ioCriticalErrorListener = server.getIoCriticalErrorListener();
this.journal = server.getStorageManager().getMessageJournal();
this.sequenceGenerator = server.getStorageManager()::generateID;
@ -107,9 +104,11 @@ public class AckManager implements ActiveMQComponent {
@Override
public synchronized void start() {
logger.debug("Starting ACKManager on {} with period = {}", server, RETRY_DELAY);
if (logger.isDebugEnabled()) {
logger.debug("Starting ACKManager on {} with period = {}, minQueueAttempts={}, maxPageAttempts={}", server, configuration.getMirrorAckManagerRetryDelay(), configuration.getMirrorAckManagerQueueAttempts(), configuration.getMirrorAckManagerPageAttempts());
}
if (!isStarted()) {
scheduledComponent = new ActiveMQScheduledComponent(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), RETRY_DELAY, RETRY_DELAY, TimeUnit.MILLISECONDS, true) {
scheduledComponent = new ActiveMQScheduledComponent(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), server.getConfiguration().getMirrorAckManagerRetryDelay(), server.getConfiguration().getMirrorAckManagerRetryDelay(), TimeUnit.MILLISECONDS, true) {
@Override
public void run() {
beginRetry();
@ -203,20 +202,20 @@ public class AckManager implements ActiveMQComponent {
// to be used with the same executor as the PagingStore executor
public boolean retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) {
public boolean retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
boolean retriedPaging = false;
logger.debug("retrying address {} on server {}", address, server);
logger.trace("retrying address {} on server {}", address, server);
try {
AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController);
if (checkRetriesAndPaging(queuesToRetry)) {
logger.debug("scanning paging for {}", address);
if (checkRetriesAndPaging(acksToRetry)) {
logger.trace("scanning paging for {}", address);
AckRetry key = new AckRetry();
PagingStore store = server.getPagingManager().getPageStore(address);
for (long pageId = store.getFirstPage(); pageId <= store.getCurrentWritingPage(); pageId++) {
if (isEmpty(queuesToRetry)) {
if (isEmpty(acksToRetry)) {
logger.trace("Retry stopped while reading page {} on address {} as the outcome is now empty, server={}", pageId, address, server);
break;
}
@ -225,16 +224,16 @@ public class AckManager implements ActiveMQComponent {
continue;
}
try {
if (retryPage(queuesToRetry, page, key)) {
if (retryPage(acksToRetry, page, key)) {
retriedPaging = true;
}
} finally {
page.usageDown();
}
}
validateExpiredSet(queuesToRetry);
validateExpiredSet(acksToRetry);
} else {
logger.debug("Page Scan not required for address {}", address);
logger.trace("Page Scan not required for address {}", address);
}
} catch (Throwable e) {
@ -251,15 +250,15 @@ public class AckManager implements ActiveMQComponent {
private void validateExpireSet(long queueID, JournalHashMap<AckRetry, AckRetry, Queue> retries) {
for (AckRetry retry : retries.valuesCopy()) {
if (retry.getQueueAttempts() >= MIN_QUEUE_ATTEMPTS) {
if (retry.attemptedPage() >= MAX_PAGE_ATTEMPTS) {
if (retry.getQueueAttempts() >= configuration.getMirrorAckManagerQueueAttempts()) {
if (retry.attemptedPage() >= configuration.getMirrorAckManagerPageAttempts()) {
if (logger.isDebugEnabled()) {
logger.debug("Retried {} {} times, giving up on the entry now", retry, retry.getPageAttempts());
}
retries.remove(retry);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Retry {} attempted {} times on paging", retry, retry.getPageAttempts());
logger.trace("Retry {} attempted {} times on paging", retry, retry.getPageAttempts());
}
}
}
@ -283,14 +282,14 @@ public class AckManager implements ActiveMQComponent {
}
long id = referenceIDSupplier.getID(pagedMessage.getMessage());
logger.debug("Looking for retry on serverID={}, id={} on server={}", serverID, id, server);
logger.trace("Looking for retry on serverID={}, id={} on server={}", serverID, id, server);
key.setNodeID(serverID).setMessageID(id);
AckRetry foundRetry = retries.get(key);
AckRetry ackRetry = retries.get(key);
// we first retry messages in the queue first.
// this is to avoid messages that are in transit from being depaged into the queue
if (foundRetry != null && foundRetry.getQueueAttempts() > MIN_QUEUE_ATTEMPTS) {
if (ackRetry != null && ackRetry.getQueueAttempts() > configuration.getMirrorAckManagerQueueAttempts()) {
Queue queue = retries.getContext();
if (queue != null) {
@ -307,9 +306,9 @@ public class AckManager implements ActiveMQComponent {
}
}
}
retries.remove(foundRetry, transaction.getID());
retries.remove(ackRetry, transaction.getID());
transaction.setContainsPersistent();
logger.debug("retry found = {} for message={} on queue", foundRetry, pagedMessage);
logger.trace("retry performed ok, ackRetry={} for message={} on queue", ackRetry, pagedMessage);
}
}
} else {
@ -341,14 +340,14 @@ public class AckManager implements ActiveMQComponent {
Queue queue = queueRetries.getContext();
for (AckRetry retry : queueRetries.valuesCopy()) {
if (ack(retry.getNodeID(), queue, retry.getMessageID(), retry.getReason(), false)) {
logger.debug("Removing retry {} as the retry went ok", retry);
logger.trace("Removing retry {} as the retry went ok", retry);
queueRetries.remove(retry);
} else {
int retried = retry.attemptedQueue();
if (logger.isDebugEnabled()) {
logger.debug("retry {} attempted {} times on the queue", retry, retried);
if (logger.isTraceEnabled()) {
logger.trace("retry {} attempted {} times on the queue", retry, retried);
}
if (retried >= MIN_QUEUE_ATTEMPTS) {
if (retried >= configuration.getMirrorAckManagerQueueAttempts()) {
needScanOnPaging = true;
}
}
@ -365,6 +364,8 @@ public class AckManager implements ActiveMQComponent {
AckRetry retry = new AckRetry(nodeID, messageID, reason);
journalHashMapProvider.getMap(queue.getID(), queue).put(retry, retry);
if (scheduledComponent != null) {
// we set the retry delay again in case it was changed.
scheduledComponent.setPeriod(configuration.getMirrorAckManagerRetryDelay());
scheduledComponent.delay();
}
}
@ -377,20 +378,29 @@ public class AckManager implements ActiveMQComponent {
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceIDSupplier);
if (reference == null) {
logger.debug("Could not find retry on reference nodeID={} (while localID={}), messageID={} on queue {}, server={}", nodeID, referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(), server);
if (logger.isDebugEnabled()) {
logger.debug("ACK Manager could not find reference nodeID={} (while localID={}), messageID={} on queue {}, server={}. Adding retry with minQueue={}, maxPage={}, delay={}", nodeID, referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(), server, configuration.getMirrorAckManagerQueueAttempts(), configuration.getMirrorAckManagerPageAttempts(), configuration.getMirrorAckManagerRetryDelay());
printQueueDebug(targetQueue);
}
if (allowRetry) {
addRetry(nodeID, targetQueue, messageID, reason);
}
return false;
} else {
if (logger.isTraceEnabled()) {
logger.trace("ack {} worked well for messageID={} nodeID={} queue={}, targetQueue={}", server, messageID, nodeID, reference.getQueue(), targetQueue);
logger.trace("ack worked well for messageID={} nodeID={} queue={}, reference={}", messageID, nodeID, reference.getQueue().getName(), reference);
if (reference.isPaged()) {
logger.trace("position for messageID={} = {}", messageID, ((PagedReference)reference).getPosition());
}
}
doACK(targetQueue, reference, reason);
return true;
}
}
private void printQueueDebug(Queue targetQueue) {
logger.debug("... queue {}/{} had {} consumers, {} messages, {} scheduled messages, {} delivering messages, paging={}", targetQueue.getID(), targetQueue.getName(), targetQueue.getConsumerCount(), targetQueue.getMessageCount(), targetQueue.getScheduledCount(), targetQueue.getDeliveringCount(), targetQueue.getPagingStore().isPaging());
}
private void doACK(Queue targetQueue, MessageReference reference, AckReason reason) {
try {
@ -399,9 +409,12 @@ public class AckManager implements ActiveMQComponent {
targetQueue.expire(reference, null, false);
break;
default:
TransactionImpl transaction = new TransactionImpl(server.getStorageManager()).setAsync(true);
TransactionImpl transaction = new TransactionImpl(server.getStorageManager());
targetQueue.acknowledge(transaction, reference, reason, null, false);
transaction.commit();
if (logger.isTraceEnabled()) {
logger.trace("Transaction {} committed on acking reference {}", transaction.getID(), reference);
}
break;
}
} catch (Exception e) {
@ -428,11 +441,6 @@ public class AckManager implements ActiveMQComponent {
public void nextStep() {
try {
if (!retryIterator.hasNext()) {
if (retriedPaging) {
logger.debug("Retried acks on paging, better to rebuild the page counters");
server.getPagingManager().rebuildCounters(null);
}
logger.trace("Iterator is done on retry, server={}", server);
AckManager.this.endRetry();
} else {

View File

@ -47,13 +47,10 @@ public class AckManagerProvider {
}
}
public static AckManager getManager(ActiveMQServer server, boolean start) {
public static AckManager getManager(ActiveMQServer server) {
synchronized (managerHashMap) {
AckManager ackManager = managerHashMap.get(server);
if (ackManager != null) {
if (start && !ackManager.isStarted()) {
ackManager.start();
}
return ackManager;
}
@ -64,9 +61,6 @@ public class AckManagerProvider {
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
if (start) {
ackManager.start();
}
return ackManager;
}

View File

@ -30,6 +30,8 @@ public class MirrorTransaction extends TransactionImpl {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
boolean allowPageTransaction;
MirrorController controlInUse;
public MirrorTransaction(StorageManager storageManager) {
@ -49,4 +51,13 @@ public class MirrorTransaction extends TransactionImpl {
}
}
@Override
public boolean isAllowPageTransaction() {
return allowPageTransaction;
}
public MirrorTransaction setAllowPageTransaction(boolean allowPageTransaction) {
this.allowPageTransaction = allowPageTransaction;
return this;
}
}

View File

@ -1509,4 +1509,30 @@ public interface Configuration {
void setManagementRbacPrefix(String prefix);
/** This configures the Mirror Ack Manager number of attempts on queues before trying page acks.
* The default value here is 5. */
int getMirrorAckManagerQueueAttempts();
Configuration setMirrorAckManagerQueueAttempts(int queueAttempts);
/** This configures the Mirror Ack Manager number of attempts on page acks.
* The default value here is 2. */
int getMirrorAckManagerPageAttempts();
Configuration setMirrorAckManagerPageAttempts(int pageAttempts);
/** This configures the interval in which the Mirror AckManager will retry acks when
* It is not intended to be configured through the XML.
* The default value here is 100, and this is in milliseconds. */
int getMirrorAckManagerRetryDelay();
Configuration setMirrorAckManagerRetryDelay(int delay);
/** Should Mirror use Page Transactions When target destinations is paging?
* When a target queue on the mirror is paged, the mirror will not record a page transaction for every message.
* The default is false, and the overhead of paged messages will be smaller, but there is a possibility of eventual duplicates in case of interrupted communication between the mirror source and target.
* If you set this to true there will be a record stored on the journal for the page-transaction additionally to the record in the page store. */
boolean isMirrorPageTransaction();
Configuration setMirrorPageTransaction(boolean ignorePageTransactions);
}

View File

@ -438,6 +438,15 @@ public class ConfigurationImpl implements Configuration, Serializable {
private boolean managementMessagesRbac = ActiveMQDefaultConfiguration.getManagementMessagesRbac();
private int mirrorAckManagerMinQueueAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMinQueueAttempts();
private int mirrorAckManagerMaxPageAttempts = ActiveMQDefaultConfiguration.getMirrorAckManagerMaxPageAttempts();
private int mirrorAckManagerRetryDelay = ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay();
private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getDefaultMirrorPageTransaction();
/**
* Parent folder for all data folders.
*/
@ -3353,6 +3362,55 @@ public class ConfigurationImpl implements Configuration, Serializable {
this.managementRbacPrefix = prefix;
}
@Override
public int getMirrorAckManagerQueueAttempts() {
return mirrorAckManagerMinQueueAttempts;
}
@Override
public ConfigurationImpl setMirrorAckManagerQueueAttempts(int minQueueAttempts) {
logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}", minQueueAttempts);
this.mirrorAckManagerMinQueueAttempts = minQueueAttempts;
return this;
}
@Override
public int getMirrorAckManagerPageAttempts() {
return this.mirrorAckManagerMaxPageAttempts;
}
@Override
public ConfigurationImpl setMirrorAckManagerPageAttempts(int maxPageAttempts) {
logger.debug("Setting mirrorAckManagerMaxPageAttempts = {}", maxPageAttempts);
this.mirrorAckManagerMaxPageAttempts = maxPageAttempts;
return this;
}
@Override
public int getMirrorAckManagerRetryDelay() {
return mirrorAckManagerRetryDelay;
}
@Override
public ConfigurationImpl setMirrorAckManagerRetryDelay(int delay) {
logger.debug("Setting mirrorAckManagerRetryDelay = {}", delay);
this.mirrorAckManagerRetryDelay = delay;
return this;
}
@Override
public boolean isMirrorPageTransaction() {
return mirrorPageTransaction;
}
@Override
public Configuration setMirrorPageTransaction(boolean ignorePageTransactions) {
logger.debug("Setting mirrorIgnorePageTransactions={}", ignorePageTransactions);
this.mirrorPageTransaction = ignorePageTransactions;
return this;
}
// extend property utils with ability to auto-fill and locate from collections
// collection entries are identified by the name() property
private static class CollectionAutoFillPropertiesUtil extends PropertyUtilsBean {

View File

@ -380,6 +380,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String ID_CACHE_SIZE = "id-cache-size";
private static final String MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS = "mirror-ack-manager-queue-attempts";
private static final String MIRROR_ACK_MANAGER_PAGE_ATTEMPTS = "mirror-ack-manager-page-attempts";
private static final String MIRROR_ACK_MANAGER_RETRY_DELAY = "mirror-ack-manager-retry-delay";
private static final String MIRROR_PAGE_TRANSACTION = "mirror-page-transaction";
private boolean validateAIO = false;
private boolean printPageMaxSizeUsed = false;
@ -849,6 +857,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setManagementRbacPrefix(getString(e, "management-rbac-prefix", config.getManagementRbacPrefix(), NO_CHECK));
config.setMirrorPageTransaction(getBoolean(e, MIRROR_PAGE_TRANSACTION, config.isMirrorPageTransaction()));
config.setMirrorAckManagerPageAttempts(getInteger(e, MIRROR_ACK_MANAGER_PAGE_ATTEMPTS, config.getMirrorAckManagerPageAttempts(), GT_ZERO));
config.setMirrorAckManagerQueueAttempts(getInteger(e, MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS, config.getMirrorAckManagerQueueAttempts(), GT_ZERO));
config.setMirrorAckManagerRetryDelay(getInteger(e, MIRROR_ACK_MANAGER_RETRY_DELAY, config.getMirrorAckManagerRetryDelay(), GT_ZERO));
parseAddressSettings(e, config);
parseResourceLimits(e, config);

View File

@ -1253,8 +1253,7 @@ public class PagingStoreImpl implements PagingStore {
return false;
}
// not using page transaction if transaction is declared async
final long transactionID = (tx == null || tx.isAsync()) ? -1 : tx.getID();
final long transactionID = (tx != null && tx.isAllowPageTransaction()) ? tx.getID() : -1L;
if (pageDecorator != null) {
message = pageDecorator.apply(message);
@ -1273,7 +1272,7 @@ public class PagingStoreImpl implements PagingStore {
currentPageSize += bytesToWrite;
}
if (tx != null && !tx.isAsync()) {
if (tx != null && tx.isAllowPageTransaction()) {
installPageTransaction(tx, listCtx);
}

View File

@ -30,8 +30,8 @@ public final class AckRetry {
byte[] temporaryNodeBytes;
long messageID;
AckReason reason;
short pageAttempts;
short queueAttempts;
int pageAttempts;
int queueAttempts;
private static Persister persister = new Persister();
@ -41,7 +41,7 @@ public final class AckRetry {
@Override
public String toString() {
return "ACKRetry{" + "nodeID='" + nodeID + '\'' + ", messageID=" + messageID + ", reason=" + reason + '}';
return "AckRetry{" + "nodeID='" + nodeID + '\'' + ", messageID=" + messageID + ", reason=" + reason + ", pageAttempts=" + pageAttempts + ", queueAttempts=" + queueAttempts + '}';
}
public AckRetry() {
@ -92,19 +92,19 @@ public final class AckRetry {
return this;
}
public short getPageAttempts() {
public int getPageAttempts() {
return pageAttempts;
}
public short getQueueAttempts() {
public int getQueueAttempts() {
return queueAttempts;
}
public short attemptedPage() {
public int attemptedPage() {
return ++pageAttempts;
}
public short attemptedQueue() {
public int attemptedQueue() {
return ++queueAttempts;
}

View File

@ -109,4 +109,8 @@ public interface Transaction {
/** To be used on control transactions that are meant as internal and don't really require a hard sync. */
Transaction setAsync(boolean async);
default boolean isAllowPageTransaction() {
return true;
}
}

View File

@ -915,6 +915,48 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="mirror-ack-manager-queue-attempts" type="xsd:int" maxOccurs="1" minOccurs="0" default="5">
<xsd:annotation>
<xsd:documentation>
The number of times a mirror target would retry an acknowledgement on the queue before scanning page files for the message.
This is exposed as mirrorAckManagerQueueAttempts on broker properties.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="mirror-ack-manager-page-attempts" type="xsd:int" maxOccurs="1" minOccurs="0" default="2">
<xsd:annotation>
<xsd:documentation>
The number of times a mirror target would retry an acknowledgement on paging.
This is exposed as mirrorAckManagerPageAttempts on broker properties.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="mirror-ack-manager-retry-delay" type="xsd:int" maxOccurs="1" minOccurs="0" default="100">
<xsd:annotation>
<xsd:documentation>
Period in milliseconds for which retries are going to be exercised.
This is exposed as mirrorAckManagerRetryDelay on broker properties.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="mirror-page-transaction" type="xsd:boolean" maxOccurs="1" minOccurs="0" default="false">
<xsd:annotation>
<xsd:documentation>
Should Mirror use Page Transactions When target destinations is paging?
When a target queue on the mirror is paged, the mirror will not record a page transaction for every message.
The default is false, and the overhead of paged messages will be smaller, but there is a possibility of eventual duplicates in case of interrupted communication between the mirror source and target.
If you set this to true there will be a record stored on the journal for the page-transaction additionally to the record in the page store.
This is exposed as mirrorPageTransactions on broker properties.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="suppress-session-notifications" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -507,6 +507,11 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertFalse(conf.getAddressSettings().get("a2").isEnableIngressTimestamp());
assertEquals(Integer.valueOf(500), conf.getAddressSettings().get("a2").getIDCacheSize());
Assert.assertEquals(111, conf.getMirrorAckManagerQueueAttempts());
Assert.assertEquals(222, conf.getMirrorAckManagerPageAttempts());
Assert.assertEquals(333, conf.getMirrorAckManagerRetryDelay());
Assert.assertTrue(conf.isMirrorPageTransaction());
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
assertEquals(13, conf.getResourceLimitSettings().get("myUser").getMaxQueues());

View File

@ -541,6 +541,12 @@
<network-check-ping-command>ping-four</network-check-ping-command>
<network-check-ping6-command>ping-six</network-check-ping6-command>
<page-sync-timeout>1000</page-sync-timeout>
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-page-transaction>true</mirror-page-transaction>
<security-settings>
<security-setting match="a1">
<permission type="createNonDurableQueue" roles="a1.1"/>

View File

@ -70,6 +70,12 @@
<critical-analyzer-timeout>777</critical-analyzer-timeout>
<critical-analyzer>false</critical-analyzer>
<literal-match-markers>()</literal-match-markers>
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-page-transaction>true</mirror-page-transaction>
<remoting-incoming-interceptors>
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1</class-name>
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor2</class-name>

View File

@ -70,6 +70,13 @@
<critical-analyzer-timeout>777</critical-analyzer-timeout>
<critical-analyzer>false</critical-analyzer>
<literal-match-markers>()</literal-match-markers>
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
<mirror-page-transaction>true</mirror-page-transaction>
<xi:include href="${xincludePath}/ConfigurationTest-xinclude-schema-config-remoting-incoming-interceptors.xml"/>
<xi:include href="${xincludePath}/ConfigurationTest-xinclude-schema-config-remoting-outgoing-interceptors.xml"/>
<persist-delivery-count-before-delivery>true</persist-delivery-count-before-delivery>

View File

@ -136,7 +136,7 @@ public class AckManagerTest extends ActiveMQTestBase {
ReferenceIDSupplier referenceIDSupplier = new ReferenceIDSupplier(server1);
{
AckManager ackManager = AckManagerProvider.getManager(server1, false);
AckManager ackManager = AckManagerProvider.getManager(server1);
AtomicInteger counter = new AtomicInteger(0);
@ -161,7 +161,8 @@ public class AckManagerTest extends ActiveMQTestBase {
// in this following loop we will get the ackManager, compare the stored retries. stop the server and validate if they were reloaded correctly
for (int repeat = 0; repeat < 2; repeat++) {
logger.info("Repeating {}", repeat);
AckManager ackManager = AckManagerProvider.getManager(server1, true);
AckManager ackManager = AckManagerProvider.getManager(server1);
ackManager.start();
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> sortedRetries = ackManager.sortRetries();
@ -179,19 +180,22 @@ public class AckManagerTest extends ActiveMQTestBase {
Wait.assertEquals(numberOfMessages, c1s1::getMessageCount);
Wait.assertEquals(numberOfMessages, c2s2::getMessageCount);
AckManager originalManager = AckManagerProvider.getManager(server1, false);
AckManager originalManager = AckManagerProvider.getManager(server1);
server1.stop();
Assert.assertEquals(0, AckManagerProvider.getSize());
server1.start();
AckManager newManager = AckManagerProvider.getManager(server1, false);
AckManager newManager = AckManagerProvider.getManager(server1);
Assert.assertEquals(1, AckManagerProvider.getSize());
Assert.assertNotSame(originalManager, AckManagerProvider.getManager(server1, true));
Assert.assertNotSame(originalManager, AckManagerProvider.getManager(server1));
AckManager manager = AckManagerProvider.getManager(server1);
Wait.assertTrue(manager::isStarted, 5_000);
Assert.assertEquals(1, AckManagerProvider.getSize());
Assert.assertNotSame(newManager, ackManager);
}
AckManager ackManager = AckManagerProvider.getManager(server1, true);
AckManager ackManager = AckManagerProvider.getManager(server1);
ackManager.start();
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> sortedRetries = ackManager.sortRetries();
Assert.assertEquals(1, sortedRetries.size());
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksOnAddress = sortedRetries.get(c1s1.getAddress());

View File

@ -0,0 +1,311 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.TestParameters;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SingleMirrorSoakTest extends SoakTestBase {
private static final String TEST_NAME = "SINGLE_MIRROR_SOAK";
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// Set this to true and log4j will be configured with some relevant log.trace for the AckManager at the server's
private static final boolean TRACE_LOGS = Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", "false"));
private static final int NUMBER_MESSAGES = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_500);
private static final int RECEIVE_COMMIT = TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT", 100);
private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
private static final String TOPIC_NAME = "topicTest";
private static String body;
static {
StringWriter writer = new StringWriter();
while (writer.getBuffer().length() < 30 * 1024) {
writer.append("The sky is blue, ..... watch out for poop from the birds though!...");
}
body = writer.toString();
}
public static final String DC1_NODE = "SingleMirrorSoakTest/DC1";
public static final String DC2_NODE = "SingleMirrorSoakTest/DC2";
volatile Process processDC1;
volatile Process processDC2;
@After
public void destroyServers() {
if (processDC1 != null) {
processDC1.destroyForcibly();
}
if (processDC2 != null) {
processDC2.destroyForcibly();
}
}
private static final String DC1_URI = "tcp://localhost:61616";
private static final String DC2_URI = "tcp://localhost:61618";
private static void createServer(String serverName,
String connectionName,
String mirrorURI,
int porOffset,
boolean paging) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setClustered(false);
cliCreateServer.setNoWeb(false);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE);
cliCreateServer.addArgs("--addresses", TOPIC_NAME);
cliCreateServer.setPortOffset(porOffset);
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("mirrorAckManagerPageAttempts", "10");
brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
File brokerXml = new File(serverLocation, "/etc/broker.xml");
Assert.assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
if (paging) {
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-size-messages>-1</max-size-messages>", "<max-size-messages>1</max-size-messages>"));
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-read-page-bytes>20M</max-read-page-bytes>", "<max-read-page-bytes>-1</max-read-page-bytes>"));
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-read-page-messages>-1</max-read-page-messages>", "<max-read-page-messages>100000</max-read-page-messages>\n" + " <prefetch-page-messages>10000</prefetch-page-messages>"));
}
if (TRACE_LOGS) {
File log4j = new File(serverLocation, "/etc/log4j2.properties");
Assert.assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" +
"\n" + "logger.ack.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n"
+ "logger.ack.level=TRACE\n"
+ "logger.config.name=org.apache.activemq.artemis.core.config.impl.ConfigurationImpl\n"
+ "logger.config.level=TRACE\n"
+ "appender.console.filter.threshold.type = ThresholdFilter\n"
+ "appender.console.filter.threshold.level = info"));
}
}
public static void createRealServers(boolean paging) throws Exception {
createServer(DC1_NODE, "mirror", DC2_URI, 0, paging);
createServer(DC2_NODE, "mirror", DC1_URI, 2, paging);
}
private void startServers() throws Exception {
processDC1 = startServer(DC1_NODE, -1, -1, new File(getServerLocation(DC1_NODE), "broker.properties"));
processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
ServerUtil.waitForServerToStart(2, 10_000);
}
@Test
public void testInterruptedMirrorTransfer() throws Exception {
createRealServers(true);
startServers();
Assert.assertTrue(KILL_INTERNAL > SEND_COMMIT);
String clientIDA = "nodeA";
String clientIDB = "nodeB";
String subscriptionID = "my-order";
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_URI);
consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT);
consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT);
SimpleManagement managementDC1 = new SimpleManagement(DC1_URI, null, null);
SimpleManagement managementDC2 = new SimpleManagement(DC2_URI, null, null);
runAfter(() -> managementDC1.close());
runAfter(() -> managementDC2.close());
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID));
ExecutorService executorService = Executors.newFixedThreadPool(3);
runAfter(executorService::shutdownNow);
executorService.execute(() -> {
try {
consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, NUMBER_MESSAGES, true, false, RECEIVE_COMMIT);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
executorService.execute(() -> {
try {
consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, NUMBER_MESSAGES, true, false, RECEIVE_COMMIT);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
OrderedExecutor restartExeuctor = new OrderedExecutor(executorService);
AtomicBoolean running = new AtomicBoolean(true);
runAfter(() -> running.set(false));
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createTopic(TOPIC_NAME));
for (int i = 0; i < NUMBER_MESSAGES; i++) {
TextMessage message = session.createTextMessage(body);
message.setIntProperty("i", i);
message.setBooleanProperty("large", false);
producer.send(message);
if (i > 0 && i % SEND_COMMIT == 0) {
logger.info("Sent {} messages", i);
session.commit();
}
if (i > 0 && i % KILL_INTERNAL == 0) {
restartExeuctor.execute(() -> {
if (running.get()) {
try {
logger.info("Restarting target server (DC2)");
if (processDC2 != null) {
processDC2.destroyForcibly();
processDC2.waitFor(1, TimeUnit.MINUTES);
processDC2 = null;
}
processDC2 = startServer(DC2_NODE, 2, 10_000, new File(getServerLocation(DC2_NODE), "broker.properties"));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
});
}
}
session.commit();
running.set(false);
}
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), 10_000);
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), 10_000);
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), 10_000);
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), 10_000);
}
private static void consume(ConnectionFactory factory,
String clientID,
String subscriptionID,
int start,
int numberOfMessages,
boolean expectEmpty,
boolean assertBody,
int batchCommit) throws Exception {
try (Connection connection = factory.createConnection()) {
connection.setClientID(clientID);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
connection.start();
MessageConsumer consumer = session.createDurableConsumer(topic, subscriptionID);
boolean failed = false;
int pendingCommit = 0;
for (int i = start; i < start + numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(10_000);
Assert.assertNotNull(message);
logger.debug("Received message {}, large={}", message.getIntProperty("i"), message.getBooleanProperty("large"));
if (message.getIntProperty("i") != i) {
failed = true;
logger.warn("Expected message {} but got {}", i, message.getIntProperty("i"));
}
logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large"));
pendingCommit++;
if (pendingCommit >= batchCommit) {
logger.info("received {}", i);
session.commit();
pendingCommit = 0;
}
}
session.commit();
Assert.assertFalse(failed);
if (expectEmpty) {
Assert.assertNull(consumer.receiveNoWait());
}
}
}
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
try {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.debug("count on queue {} is {}", queue, value);
return value;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return -1;
}
}
}

View File

@ -141,4 +141,11 @@ export TEST_CLIENT_FAILURE_OPENWIRE_MEMORY_CLIENT=-Xmx256m
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_TEST_ENABLED=true
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_SERVERS=3
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10
export TEST_SINGLE_MIRROR_SOAK_TRACE_LOGS=false
export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=2500
export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=100
export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=100
export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=500
export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000