ARTEMIS-5010 AckManager records are not replicated

This commit is contained in:
Clebert Suconic 2024-08-22 21:39:08 -04:00 committed by clebertsuconic
parent 8a56d2ea33
commit 4ba9f67d80
20 changed files with 611 additions and 243 deletions

View File

@ -32,7 +32,6 @@ import java.util.function.Supplier;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -109,7 +108,7 @@ public class JournalHashMap<K, V, C> implements Map<K, V> {
}
}
public JournalHashMap(long collectionId, Journal journal, LongSupplier idGenerator, Persister<MapRecord<K, V>> persister, byte recordType, Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider, IOCriticalErrorListener ioExceptionListener) {
public JournalHashMap(long collectionId, MapStorageManager journal, LongSupplier idGenerator, Persister<MapRecord<K, V>> persister, byte recordType, Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider, IOCriticalErrorListener ioExceptionListener) {
this.collectionId = collectionId;
this.journal = journal;
this.idGenerator = idGenerator;
@ -126,7 +125,7 @@ public class JournalHashMap<K, V, C> implements Map<K, V> {
private final Persister<MapRecord<K, V>> persister;
private final Journal journal;
private final MapStorageManager journal;
private final long collectionId;
@ -221,9 +220,9 @@ public class JournalHashMap<K, V, C> implements Map<K, V> {
}
if (callback == null) {
journal.appendAddRecord(record.id, recordType, persister, record, false);
journal.storeMapRecord(record.id, recordType, persister, record, false);
} else {
journal.appendAddRecord(record.id, recordType, persister, record, true, callback);
journal.storeMapRecord(record.id, recordType, persister, record, true, callback);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
@ -234,7 +233,7 @@ public class JournalHashMap<K, V, C> implements Map<K, V> {
// callers must be synchronized
private void removed(MapRecord<K, V> record) {
try {
journal.appendDeleteRecord(record.id, false);
journal.deleteMapRecord(record.id, false);
} catch (Exception e) {
exceptionListener.onIOException(e, e.getMessage(), null);
}
@ -243,7 +242,7 @@ public class JournalHashMap<K, V, C> implements Map<K, V> {
// callers must be synchronized
private void removed(MapRecord<K, V> record, long txid) {
try {
journal.appendDeleteRecordTransactional(txid, record.id);
journal.deleteMapRecordTx(txid, record.id);
} catch (Exception e) {
exceptionListener.onIOException(e, e.getMessage(), null);
}

View File

@ -27,13 +27,12 @@ import java.util.function.Supplier;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.persistence.Persister;
public class JournalHashMapProvider<K, V, C> {
final Journal journal;
final MapStorageManager journal;
final Persister<JournalHashMap.MapRecord<K, V>> persister;
final LongObjectHashMap<JournalHashMap<K, V, C>> journalMaps = new LongObjectHashMap<>();
final LongSupplier idSupplier;
@ -42,7 +41,7 @@ public class JournalHashMapProvider<K, V, C> {
final Supplier<IOCompletion> ioCompletionSupplier;
final LongFunction<C> contextProvider;
public JournalHashMapProvider(LongSupplier idSupplier, Journal journal, AbstractHashMapPersister<K, V> persister, byte recordType, Supplier<IOCompletion> ioCompletionSupplier, LongFunction<C> contextProvider, IOCriticalErrorListener ioExceptionListener) {
public JournalHashMapProvider(LongSupplier idSupplier, MapStorageManager journal, AbstractHashMapPersister<K, V> persister, byte recordType, Supplier<IOCompletion> ioCompletionSupplier, LongFunction<C> contextProvider, IOCriticalErrorListener ioExceptionListener) {
this.idSupplier = idSupplier;
this.persister = persister;
this.journal = journal;

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.journal.collections;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.persistence.Persister;
public interface MapStorageManager {
void storeMapRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion completionCallback) throws Exception;
void storeMapRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception;
void deleteMapRecord(long id, boolean sync) throws Exception;
void deleteMapRecordTx(long txid, long id) throws Exception;
}

View File

@ -108,6 +108,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
private Long amqpIdleTimeout;
private long ackManagerFlushTimeout = 10_000;
private boolean directDeliver = true;
private final AMQPRoutingHandler routingHandler;
@ -145,6 +147,17 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
}
/** Before the ackManager retries acks, it must flush the OperationContext on the MirrorTargets.
* This is the timeout is in milliseconds*/
public long getAckManagerFlushTimeout() {
return ackManagerFlushTimeout;
}
public ProtonProtocolManager setAckManagerFlushTimeout(long ackManagerFlushTimeout) {
this.ackManagerFlushTimeout = ackManagerFlushTimeout;
return this;
}
public int getAmqpMinLargeMessageSize() {
return amqpMinLargeMessageSize;
}

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
@ -63,6 +64,8 @@ import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS;
@ -177,6 +180,48 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
private AckManager ackManager;
/** This method will wait both replication and storage to finish their current operations. */
public void flush() {
CountDownLatch latch = new CountDownLatch(1);
connection.runNow(() -> {
OperationContext oldContext = OperationContextImpl.getContext();
try {
OperationContextImpl.setContext(mirrorContext);
mirrorContext.executeOnCompletion(new IOCallback() {
@Override
public void done() {
latch.countDown();
}
@Override
public void onError(int errorCode, String errorMessage) {
logger.warn("error code = {} / message = {}", errorCode, errorMessage);
latch.countDown();
}
});
} finally {
OperationContextImpl.setContext(oldContext);
}
});
long timeout;
try {
timeout = connection.getProtocolManager().getAckManagerFlushTimeout();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
timeout = 10_000;
}
try {
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
ActiveMQAMQPProtocolLogger.LOGGER.timedOutAckManager(timeout);
}
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
AMQPSessionContext protonSession,

View File

@ -31,7 +31,6 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
import org.apache.activemq.artemis.core.journal.collections.JournalHashMapProvider;
@ -53,6 +52,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,7 +63,6 @@ public class AckManager implements ActiveMQComponent {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final Journal journal;
final LongSupplier sequenceGenerator;
final JournalHashMapProvider<AckRetry, AckRetry, Queue> journalHashMapProvider;
final ActiveMQServer server;
@ -77,9 +76,10 @@ public class AckManager implements ActiveMQComponent {
this.server = server;
this.configuration = server.getConfiguration();
this.ioCriticalErrorListener = server.getIoCriticalErrorListener();
this.journal = server.getStorageManager().getMessageJournal();
this.sequenceGenerator = server.getStorageManager()::generateID;
journalHashMapProvider = new JournalHashMapProvider<>(sequenceGenerator, journal, AckRetry.getPersister(), JournalRecordIds.ACK_RETRY, OperationContextImpl::getContext, server.getPostOffice()::findQueue, server.getIoCriticalErrorListener());
// The JournalHashMap has to use the storage manager to guarantee we are using the Replicated Journal Wrapper in case this is a replicated journal
journalHashMapProvider = new JournalHashMapProvider<>(sequenceGenerator, server.getStorageManager(), AckRetry.getPersister(), JournalRecordIds.ACK_RETRY, OperationContextImpl::getContext, server.getPostOffice()::findQueue, server.getIoCriticalErrorListener());
this.referenceIDSupplier = new ReferenceIDSupplier(server);
}
@ -149,15 +149,32 @@ public class AckManager implements ActiveMQComponent {
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> retries = sortRetries();
scanAndFlushMirrorTargets();
if (retries.isEmpty()) {
logger.trace("Nothing to retry!, server={}", server);
return false;
}
progress = new MultiStepProgress(sortRetries());
progress = new MultiStepProgress(retries);
return true;
}
private void scanAndFlushMirrorTargets() {
logger.debug("scanning and flushing mirror targets");
// this will navigate on each connection, find the connection that has a mirror controller, and call flushMirrorTarget for each MirrorTargets. (it should be 1 in most cases)
// An alternative design instead of going through the connections, would be to register the MirrorTargets within the AckManager, however to avoid memory leaks after disconnects and reconnects it is safer to
// scan through the connections
server.getRemotingService().getConnections().stream().
filter(c -> c instanceof ActiveMQProtonRemotingConnection && ((ActiveMQProtonRemotingConnection) c).getAmqpConnection().getMirrorControllerTargets() != null).
forEach(c -> ((ActiveMQProtonRemotingConnection) c).getAmqpConnection().getMirrorControllerTargets().forEach(this::flushMirrorTarget));
}
private void flushMirrorTarget(AMQPMirrorControllerTarget target) {
logger.debug("Flushing mirror {}", target);
target.flush();
}
// Sort the ACK list by address
// We have the retries by queue, we need to sort them by address
// as we will perform all the retries on the same addresses at the same time (in the Multicast case with multiple queues acking)

View File

@ -58,4 +58,7 @@ public interface ActiveMQAMQPProtocolLogger {
@LogMessage(id = 111007, value = "Invalid Connection State: {} for remote IP {}", level = LogMessage.Level.WARN)
void invalidAMQPConnectionState(Object state, Object remoteIP);
@LogMessage(id = 111008, value = "The AckManager timed out waiting for operations to complete on the MirrorTarget. timeout = {} milliseconds", level = LogMessage.Level.WARN)
void timedOutAckManager(long timeout);
}

View File

@ -17,9 +17,11 @@
package org.apache.activemq.artemis.protocol.amqp.proton;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -44,6 +46,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationAddressSenderController;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationQueueSenderController;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPSecurityException;
@ -108,6 +111,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
flush();
}
private List<AMQPMirrorControllerTarget> mirrorControllerTargets;
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
public static final String AMQP_CONTAINER_ID = "amqp-container-id";
private static final FutureTask<Void> VOID_FUTURE = new FutureTask<>(() -> { }, null);
@ -198,6 +203,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
}
public List<AMQPMirrorControllerTarget> getMirrorControllerTargets() {
return mirrorControllerTargets;
}
public AMQPConnectionContext addMirrorControllerTarget(AMQPMirrorControllerTarget mirrorControllerTarget) {
if (mirrorControllerTargets == null) {
mirrorControllerTargets = new ArrayList<>();
}
mirrorControllerTargets.add(mirrorControllerTarget);
return this;
}
public boolean isLargeMessageSync() {
return connectionCallback.isLargeMessageSync();
}

View File

@ -314,6 +314,8 @@ public class AMQPSessionContext extends ProtonInitializable {
final AMQPMirrorControllerTarget protonReceiver =
new AMQPMirrorControllerTarget(sessionSPI, connection, this, receiver, server);
connection.addMirrorControllerTarget(protonReceiver);
final HashMap<Symbol, Object> brokerIDProperties = new HashMap<>();
brokerIDProperties.put(AMQPMirrorControllerSource.BROKER_ID, server.getNodeID().toString());
receiver.setProperties(brokerIDProperties);

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.collections.MapStorageManager;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
@ -76,7 +77,7 @@ import org.apache.activemq.artemis.utils.IDGenerator;
* I couldn't just get the IDGenerator from the inner part because the NullPersistent has its own sequence.
* So the best was to add the interface and adjust the callers for the method
*/
public interface StorageManager extends IDGenerator, ActiveMQComponent {
public interface StorageManager extends MapStorageManager, IDGenerator, ActiveMQComponent {
default long getMaxRecordSize() {
/** Null journal is pretty much memory */

View File

@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@ -69,6 +70,7 @@ import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -387,6 +389,45 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
}
@Override
public void storeMapRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendAddRecord(id, recordType, persister, record, sync, completionCallback);
}
}
@Override
public void storeMapRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendAddRecord(id, recordType, persister, record, sync);
}
}
@Override
public void deleteMapRecord(long id, boolean sync) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecord(id, sync);
}
}
@Override
public void deleteMapRecordTx(long txid, long id) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecordTransactional(txid, id);
}
}
@Override
public void storeMessage(final Message message) throws Exception {
if (message.getMessageID() <= 0) {

View File

@ -54,17 +54,13 @@ public final class AckRetry {
}
public byte[] getTemporaryNodeBytes() {
public synchronized byte[] getNodeIDBytes() {
if (temporaryNodeBytes == null) {
temporaryNodeBytes = nodeID.getBytes(StandardCharsets.US_ASCII);
}
return temporaryNodeBytes;
}
public void clearTemporaryNodeBytes() {
this.temporaryNodeBytes = null;
}
public String getNodeID() {
return nodeID;
}
@ -138,7 +134,7 @@ public final class AckRetry {
@Override
protected int getKeySize(AckRetry key) {
return DataConstants.SIZE_INT +
(key.getNodeID() == null ? 0 : key.getTemporaryNodeBytes().length) +
(key.getNodeID() == null ? 0 : key.getNodeIDBytes().length) +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BYTE;
}
@ -148,13 +144,12 @@ public final class AckRetry {
if (key.getNodeID() == null) {
buffer.writeInt(0);
} else {
byte[] temporaryNodeBytes = key.getTemporaryNodeBytes();
byte[] temporaryNodeBytes = key.getNodeIDBytes();
buffer.writeInt(temporaryNodeBytes.length);
buffer.writeBytes(temporaryNodeBytes);
}
buffer.writeLong(key.messageID);
buffer.writeByte(key.reason.getVal());
key.clearTemporaryNodeBytes();
}
@Override

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -45,6 +46,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -735,5 +737,32 @@ public class NullStorageManager implements StorageManager {
}
@Override
public void storeMapRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
}
@Override
public void storeMapRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync) throws Exception {
}
@Override
public void deleteMapRecord(long id, boolean sync) throws Exception {
}
@Override
public void deleteMapRecordTx(long txid, long id) throws Exception {
}
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -47,6 +48,7 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.AbstractPersistedAddressSetting;
@ -940,7 +942,31 @@ public class TransactionImplTest extends ServerTestBase {
@Override
public void deleteAddressStatus(long recordID) throws Exception {
}
@Override
public void storeMapRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
}
@Override
public void storeMapRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync) throws Exception {
}
@Override
public void deleteMapRecord(long id, boolean sync) throws Exception {
}
@Override
public void deleteMapRecordTx(long txid, long id) throws Exception {
}
}

View File

@ -230,6 +230,12 @@ public abstract class ActiveMQTestBase extends ArtemisTestCase {
}
protected static String randomProtocol() {
String[] protocols = {"AMQP", "OPENWIRE", "CORE"};
return protocols[org.apache.activemq.artemis.tests.util.RandomUtil.randomPositiveInt() % 3];
}
protected <T> T serialClone(Object object) throws Exception {
logger.debug("object::{}", object);
ByteArrayOutputStream bout = new ByteArrayOutputStream();

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -59,6 +60,7 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.AbstractPersistedAddressSetting;
@ -600,6 +602,36 @@ public class SendAckFailTest extends SpawnedTestBase {
manager.deletePageTransactional(recordID);
}
@Override
public void storeMapRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
manager.storeMapRecord(id, recordType, persister, record, sync, completionCallback);
}
@Override
public void storeMapRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync) throws Exception {
manager.storeMapRecord(id, recordType, persister, record, sync);
}
@Override
public void deleteMapRecord(long id, boolean sync) throws Exception {
manager.deleteMapRecord(id, sync);
}
@Override
public void deleteMapRecordTx(long txid, long id) throws Exception {
manager.deleteMapRecordTx(txid, id);
}
@Override
public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,

View File

@ -27,12 +27,16 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.collections.AbstractHashMapPersister;
import org.apache.activemq.artemis.core.journal.collections.MapStorageManager;
import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
import org.apache.activemq.artemis.core.journal.collections.JournalHashMapProvider;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
@ -43,6 +47,44 @@ import org.junit.jupiter.api.Test;
public class JournalHashMapTest extends ActiveMQTestBase {
static class JournalManager implements MapStorageManager {
final Journal journal;
JournalManager(Journal journal) {
this.journal = journal;
}
@Override
public void storeMapRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion completionCallback) throws Exception {
journal.appendAddRecord(id, recordType, persister, record, sync, completionCallback);
}
@Override
public void storeMapRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync) throws Exception {
journal.appendAddRecord(id, recordType, persister, record, sync);
}
@Override
public void deleteMapRecord(long id, boolean sync) throws Exception {
journal.appendDeleteRecord(id, sync);
}
@Override
public void deleteMapRecordTx(long txid, long id) throws Exception {
journal.appendDeleteRecordTransactional(txid, id);
}
}
@Test
public void testHashMap() throws Exception {
ExecutorService service = Executors.newFixedThreadPool(10);
@ -60,7 +102,7 @@ public class JournalHashMapTest extends ActiveMQTestBase {
AtomicLong sequence = new AtomicLong(1);
JournalHashMapProvider<Long, Long, Object> journalHashMapProvider = new JournalHashMapProvider(sequence::incrementAndGet, journal, new LongPersister(), (byte)3, OperationContextImpl::getContext, l -> null, (e, m, f) -> {
JournalHashMapProvider<Long, Long, Object> journalHashMapProvider = new JournalHashMapProvider(sequence::incrementAndGet, new JournalManager(journal), new LongPersister(), (byte)3, OperationContextImpl::getContext, l -> null, (e, m, f) -> {
e.printStackTrace();
});

View File

@ -25,6 +25,7 @@ import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -32,8 +33,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
@ -41,21 +44,34 @@ import org.apache.activemq.artemis.core.config.ha.DistributedLockManagerConfigur
import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManagerProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RepeatStartBackupTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -74,7 +90,6 @@ public class RepeatStartBackupTest extends ActiveMQTestBase {
@Override
public void setUp() throws Exception {
managerConfiguration = new DistributedLockManagerConfiguration(FileBasedLockManager.class.getName(), Collections.singletonMap("locks-folder", newTemporaryFolder("manager").toString()));
final int timeout = (int) TimeUnit.SECONDS.toMillis(30);
// start live
Configuration liveConfiguration = createLiveConfiguration();
@ -93,6 +108,9 @@ public class RepeatStartBackupTest extends ActiveMQTestBase {
((ReplicationBackupPolicyConfiguration) backupConfiguration.getHAPolicyConfiguration()).setAllowFailBack(true);
backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration));
backupServer.setIdentity("BACKUP");
}
private void startBackup(int timeout) throws Exception {
backupServer.start();
Wait.waitFor(backupServer::isStarted);
@ -102,6 +120,7 @@ public class RepeatStartBackupTest extends ActiveMQTestBase {
@Test
public void testLoopStart() throws Exception {
startBackup(30_000);
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
@ -124,7 +143,7 @@ public class RepeatStartBackupTest extends ActiveMQTestBase {
connection.start();
while (running.get()) {
producer.send(session.createTextMessage("hello"));
Assertions.assertNotNull(consumer.receive(1000));
assertNotNull(consumer.receive(1000));
}
}
} catch (Throwable e) {
@ -145,19 +164,125 @@ public class RepeatStartBackupTest extends ActiveMQTestBase {
Wait.assertTrue(backupServer::isReplicaSync);
}
Assertions.assertFalse(loggerHandler.findText("AMQ229254"));
Assertions.assertFalse(loggerHandler.findText("AMQ229006"));
assertFalse(loggerHandler.findText("AMQ229254"));
assertFalse(loggerHandler.findText("AMQ229006"));
loggerHandler.clear();
}
running.set(false);
Assertions.assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(latch.await(10, TimeUnit.SECONDS));
Assertions.assertEquals(0, errors.get());
assertEquals(0, errors.get());
}
}
@Test
public void testAckManagerRepetition() throws Exception {
String queueName = "queue_" + RandomUtil.randomString();
// some extremely large retry settings
// just to make sure these records will never be removed
server.getConfiguration().setMirrorAckManagerQueueAttempts(300000);
server.getConfiguration().setMirrorAckManagerPageAttempts(300000);
server.getConfiguration().setMirrorAckManagerRetryDelay(60_000);
backupServer.getConfiguration().setMirrorAckManagerQueueAttempts(300000);
backupServer.getConfiguration().setMirrorAckManagerPageAttempts(300000);
backupServer.getConfiguration().setMirrorAckManagerRetryDelay(60_000);
ExecutorService executorService = Executors.newFixedThreadPool(2);
runAfter(executorService::shutdownNow);
AtomicInteger errors = new AtomicInteger(0);
AtomicBoolean running = new AtomicBoolean(true);
runAfter(() -> running.set(false));
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch backupStarted = new CountDownLatch(1);
AtomicInteger recordsSent = new AtomicInteger(0);
int starBackupAt = 100;
assertFalse(server.isReplicaSync());
assertFalse(backupServer.isStarted());
AckManager liveAckManager = AckManagerProvider.getManager(server);
server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.ANYCAST));
Queue queueOnServerLive = server.createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(true));
long queueIdOnServerLive = queueOnServerLive.getID();
OperationContextImpl context = new OperationContextImpl(server.getExecutorFactory().getExecutor());
executorService.execute(() -> {
try {
OperationContextImpl.setContext(context);
while (running.get()) {
int id = recordsSent.getAndIncrement();
if (id == starBackupAt) {
executorService.execute(() -> {
try {
backupServer.start();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
} finally {
backupStarted.countDown();
}
});
}
CountDownLatch latchAcked = new CountDownLatch(1);
liveAckManager.ack(server.getNodeID().toString(), queueOnServerLive, id, AckReason.NORMAL, true);
OperationContextImpl.getContext().executeOnCompletion(new IOCallback() {
@Override
public void done() {
latchAcked.countDown();
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
if (!latchAcked.await(10, TimeUnit.SECONDS)) {
logger.warn("Could not wait ack to finish");
}
Thread.yield();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
latch.countDown();
}
});
assertTrue(backupStarted.await(10, TimeUnit.SECONDS));
Wait.assertTrue(server::isReplicaSync);
Wait.assertTrue(() -> recordsSent.get() > 200);
running.set(false);
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(0, errors.get());
validateAckManager(server, queueName, queueIdOnServerLive, recordsSent.get());
server.stop();
Wait.assertTrue(backupServer::isActive);
validateAckManager(backupServer, queueName, queueIdOnServerLive, recordsSent.get());
}
private void validateAckManager(ActiveMQServer server,
String queueName,
long queueIdOnServerLive,
int messagesSent) {
AckManager liveManager = AckManagerProvider.getManager(server);
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> sortedRetries = liveManager.sortRetries();
assertEquals(1, sortedRetries.size());
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> retryAddress = sortedRetries.get(SimpleString.of(queueName));
JournalHashMap<AckRetry, AckRetry, Queue> journalHashMapBackup = retryAddress.get(queueIdOnServerLive);
assertEquals(messagesSent, journalHashMapBackup.size());
}
protected HAPolicyConfiguration createReplicationLiveConfiguration() {

View File

@ -1,182 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class QuickAckMirrorTest extends SoakTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String QUEUE_NAME = "myQueue";
public static final String DC1_NODE_A = "ImmediateAckIdempotentTest/DC1";
public static final String DC2_NODE_A = "ImmediateAckIdempotentTest/DC2";
Process processDC1_node_A;
Process processDC2_node_A;
private static String DC1_NODEA_URI = "tcp://localhost:61616";
private static String DC2_NODEA_URI = "tcp://localhost:61618";
private static void createServer(String serverName,
String connectionName,
String mirrorURI,
int portOffset) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setClustered(false);
cliCreateServer.setNoWeb(true);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A);
cliCreateServer.addArgs("--queues", QUEUE_NAME);
cliCreateServer.setPortOffset(portOffset);
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
/* one way to show missed ACKs would be by setting:
*
* mirrorAckManagerMinQueueAttempts=1
* mirrorAckManagerMaxPageAttempts=1
* mirrorAckManagerRetryDelay=1
*
*
* the retry will be faster than the message would arrive at the queue
* */
brokerProperties.put("mirrorAckManagerMinQueueAttempts", "10");
brokerProperties.put("mirrorAckManagerMaxPageAttempts", "10");
brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
// introducing more delay in storage
Assertions.assertTrue(FileUtil.findReplace(new File(serverLocation, "/etc/broker.xml"), "</journal-file-size>", "</journal-file-size>\n <journal-buffer-timeout>20000000</journal-buffer-timeout>"));
brokerProperties.put("largeMessageSync", "false");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
}
@BeforeAll
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC2_NODEA_URI, 0);
createServer(DC2_NODE_A, "mirror", DC1_NODEA_URI, 2);
}
@BeforeEach
public void cleanupServers() {
cleanupData(DC1_NODE_A);
cleanupData(DC2_NODE_A);
}
@Test
public void testQuickACKAMQP() throws Exception {
testQuickACK("AMQP");
}
@Test
public void testQuickACKCORE() throws Exception {
testQuickACK("CORE");
}
@Test
public void testQuickACKOpenWire() throws Exception {
testQuickACK("OPENWIRE");
}
private void testQuickACK(final String protocol) throws Exception {
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI);
final int numberOfMessages = 1_000;
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < numberOfMessages; i++) {
if (i % 100 == 0) {
logger.info("Sent and received {}", i);
}
String text = "hello hello hello " + RandomUtil.randomString();
producer.send(session.createTextMessage(text));
TextMessage textMessage = (TextMessage) consumer.receive(5000);
Assertions.assertNotNull(textMessage);
Assertions.assertEquals(text, textMessage.getText());
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
try {
Thread.sleep(100);
} catch (Throwable ignored) {
}
}
processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC1A, snfQueue));
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC1A, QUEUE_NAME));
Wait.assertEquals(0, () -> getMessageCount(simpleManagementDC2A, QUEUE_NAME));
}
}

View File

@ -22,6 +22,7 @@ import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.BufferedReader;
@ -56,6 +57,8 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
@ -136,19 +139,19 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
private void startDC2(SimpleManagement managementDC2) throws Exception {
processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties"));
ServerUtil.waitForServerToStart(2, 10_000);
processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new File(getServerLocation(DC2_REPLICA_NODE), "broker.properties"));
ServerUtil.waitForServerToStart(2, 10_000);
Wait.assertTrue(managementDC2::isReplicaSync);
}
private void startDC1(SimpleManagement managementDC1) throws Exception {
processDC1 = startServer(DC1_NODE, -1, -1, new File(getServerLocation(DC1_NODE), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
processDC1_REPLICA = startServer(DC1_REPLICA_NODE, -1, -1, new File(getServerLocation(DC1_REPLICA_NODE), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
Wait.assertTrue(managementDC1::isReplicaSync);
}
private static void createMirroredServer(String serverName,
private static void createMirroredServer(boolean paging, String serverName,
String connectionName,
String mirrorURI,
int portOffset,
@ -163,7 +166,6 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
cliCreateServer.setNoWeb(true);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE);
cliCreateServer.addArgs("--queues", QUEUE_NAME);
cliCreateServer.setPortOffset(portOffset);
@ -185,19 +187,19 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("addressSettings.#.maxSizeMessages", "50");
brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
if (paging) {
brokerProperties.put("addressSettings.#.maxSizeMessages", "50");
brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
brokerProperties.put("mirrorPageTransaction", "true");
}
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
File brokerXml = new File(serverLocation, "/etc/broker.xml");
assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay>\n"));
assertTrue(FileUtil.findReplace(brokerXml, "<page-size-bytes>10M</page-size-bytes>", "<page-size-bytes>100K</page-size-bytes>"));
if (TRACE_LOGS) {
@ -208,10 +210,19 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
private static void replaceLogs(File serverLocation) throws Exception {
File log4j = new File(serverLocation, "/etc/log4j2.properties");
assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" + "\n" + "logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n" + "logger.endpoint.level=DEBUG\n" + "appender.console.filter.threshold.type = ThresholdFilter\n" + "appender.console.filter.threshold.level = info"));
assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO",
"logger.artemis_utils.level=INFO\n" + "\n" +
"logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n" +
"logger.endpoint.level=INFO\n" +
"logger.ack.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n" +
"logger.ack.level=TRACE\n" +
"logger.mirrorTarget.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget\n" +
"logger.mirrorTarget.level=INFO\n" +
"appender.console.filter.threshold.type = ThresholdFilter\n" +
"appender.console.filter.threshold.level = info"));
}
private static void createMirroredBackupServer(String serverName, int portOffset, String clusterStatic, String mirrorURI) throws Exception {
private static void createMirroredBackupServer(boolean paging, String serverName, int portOffset, String clusterStatic, String mirrorURI) throws Exception {
File serverLocation = getFileServerLocation(serverName);
if (REUSE_SERVERS && serverLocation.exists()) {
deleteDirectory(new File(serverLocation, "data"));
@ -222,7 +233,6 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setNoWeb(true);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE);
cliCreateServer.setPortOffset(portOffset);
cliCreateServer.setClustered(true);
@ -239,32 +249,39 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
brokerProperties.put("AMQPConnections.mirror.connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
brokerProperties.put("mirrorAckManagerQueueAttempts", "5");
brokerProperties.put("mirrorAckManagerPageAttempts", "500000");
brokerProperties.put("mirrorAckManagerRetryDelay", "500");
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
if (paging) {
brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
}
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
File brokerXml = new File(serverLocation, "/etc/broker.xml");
assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
assertTrue(FileUtil.findReplace(brokerXml, "<page-size-bytes>10M</page-size-bytes>", "<page-size-bytes>100K</page-size-bytes>"));
if (TRACE_LOGS) {
replaceLogs(serverLocation);
}
}
public static void createRealServers() throws Exception {
createMirroredServer(DC1_NODE, "mirror", uriWithAlternate(DC2_IP, DC2_BACKUP_IP), 0, true, uri(DC1_BACKUP_IP));
createMirroredBackupServer(DC1_REPLICA_NODE, 1, uri(DC1_IP), uriWithAlternate(DC2_IP, DC2_BACKUP_IP));
createMirroredServer(DC2_NODE, "mirror", uriWithAlternate(DC1_IP, DC1_BACKUP_IP), 2, true, uri(DC2_BACKUP_IP));
createMirroredBackupServer(DC2_REPLICA_NODE, 3, uri(DC2_IP), uriWithAlternate(DC1_IP, DC1_BACKUP_IP));
public static void createRealServers(boolean paging) throws Exception {
createMirroredServer(paging, DC1_NODE, "mirror", uriWithAlternate(DC2_IP, DC2_BACKUP_IP), 0, true, uri(DC1_BACKUP_IP));
createMirroredBackupServer(paging, DC1_REPLICA_NODE, 1, uri(DC1_IP), uriWithAlternate(DC2_IP, DC2_BACKUP_IP));
createMirroredServer(paging, DC2_NODE, "mirror", uriWithAlternate(DC1_IP, DC1_BACKUP_IP), 2, true, uri(DC2_BACKUP_IP));
createMirroredBackupServer(paging, DC2_REPLICA_NODE, 3, uri(DC2_IP), uriWithAlternate(DC1_IP, DC1_BACKUP_IP));
}
@Test
@ -278,7 +295,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
}
private void testMirror(boolean laterStart) throws Exception {
createRealServers();
createRealServers(true);
SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null, null);
SimpleManagement managementDC2 = new SimpleManagement(uri(DC2_IP), null, null);
@ -328,6 +345,106 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
}
}
@Test
public void testQuickACKRandomProtocol() throws Exception {
String protocol = randomProtocol();
logger.info("using protocol {}", protocol);
// There shouldn't be any semantic difference for the test based on the protocol we choose to run
// However I will make this a random choice to make sure we cover eventually all the protocols.
// I didn't want to waste time on the testsuite/CI running all 3 protocols on every run for this case
testQuickACK(protocol);
}
private void testQuickACK(final String protocol) throws Exception {
createRealServers(false);
SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null, null);
SimpleManagement managementDC2 = new SimpleManagement(uri(DC2_IP), null, null);
SimpleManagement managementDC2Backup = new SimpleManagement(uri(DC2_BACKUP_IP), null, null);
startDC1(managementDC1);
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, uri(DC1_IP));
final int startAt = 300;
final int killAt = 800;
final int totalMessages = 1000;
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < totalMessages; i++) {
if (i % 100 == 0) {
logger.info("Sent and received {}", i);
}
if (i == startAt) {
// lazy start to allow messages accumulated in the SNF
processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties"));
processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new File(getServerLocation(DC2_REPLICA_NODE), "broker.properties"));
} else if (i == killAt) { // kill the live on DC2
logger.info("KillAt {}", killAt);
ServerUtil.waitForServerToStart(1, 10_000);
Wait.assertTrue(managementDC2::isReplicaSync);
processDC2.destroyForcibly();
assertTrue(processDC2.waitFor(10, TimeUnit.SECONDS));
}
String text = "hello hello hello " + i;
TextMessage message = session.createTextMessage(text);
message.setIntProperty("i", i);
producer.send(message);
TextMessage textMessage = (TextMessage) consumer.receive(5000);
Assertions.assertNotNull(textMessage);
Assertions.assertEquals(text, textMessage.getText());
}
}
final int oddSend = 33;
try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < oddSend; i++) {
TextMessage message = session.createTextMessage("oddSend " + i);
message.setIntProperty("oddSend", i);
producer.send(message);
}
session.commit();
}
Wait.assertEquals(0, () -> getMessageCount(managementDC1, snfQueue));
Wait.assertEquals(oddSend, () -> getMessageCount(managementDC1, QUEUE_NAME));
Wait.assertEquals(oddSend, () -> getMessageCount(managementDC2Backup, QUEUE_NAME));
ConnectionFactory cfDC2Backup = CFUtil.createConnectionFactory(protocol, uri(DC2_BACKUP_IP));
try (Connection connection = cfDC2Backup.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < oddSend; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assertions.assertNotNull(message);
assertEquals("oddSend " + i, message.getText());
}
assertNull(consumer.receiveNoWait());
session.commit();
}
Wait.assertEquals(0, () -> getMessageCount(managementDC1, snfQueue));
Wait.assertEquals(0, () -> getMessageCount(managementDC2Backup, snfQueue));
Wait.assertEquals(0, () -> getMessageCount(managementDC1, QUEUE_NAME));
Wait.assertEquals(0, () -> getMessageCount(managementDC2Backup, QUEUE_NAME));
}
@Test
public void testMultipleSenders() throws Exception {
@ -337,7 +454,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
logger.warn("lsof is not available in this platform, we will ignore this test - {}", e.getMessage(), e);
Assumptions.abort("lsof is not available");
}
createRealServers();
createRealServers(true);
SimpleManagement managementDC1 = new SimpleManagement(uri(DC1_IP), null, null);
SimpleManagement managementDC2 = new SimpleManagement(uri(DC2_IP), null, null);
@ -403,6 +520,7 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
}
private static void sendMessages(String queueName) throws JMSException {
long start = System.currentTimeMillis();
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", uri(DC1_IP));
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@ -415,6 +533,10 @@ public class ReplicatedBothNodesMirrorTest extends SoakTestBase {
if (i > 0 && i % SEND_COMMIT == 0) {
logger.info("Sent {} messages on {}", i, queueName);
session.commit();
long timePassed = System.currentTimeMillis() - start;
double secondsPassed = timePassed / 1000f;
logger.info("sent {} messages, msgs/second = {}", i, (i / secondsPassed));
}
}