ARTEMIS-4136 Mirrored sync replica

I am adding an option sync=true or false on mirror. if sync, any client blocking operation will wait a roundtrip to the mirror
acting like a sync replica.
This commit is contained in:
Clebert Suconic 2023-01-18 16:58:39 -05:00 committed by clebertsuconic
parent b0ba8cae24
commit 0d3cd8d880
30 changed files with 935 additions and 123 deletions

View File

@ -44,6 +44,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
@ -148,8 +149,8 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
* It was written to check the deliveryAnnotationsForSendBuffer and eventually move it to the protocolData.
*/
public void checkReference(MessageReference reference) {
if (reference.getProtocolData() == null && deliveryAnnotationsForSendBuffer != null) {
reference.setProtocolData(deliveryAnnotationsForSendBuffer);
if (reference.getProtocolData(DeliveryAnnotations.class) == null && deliveryAnnotationsForSendBuffer != null) {
reference.setProtocolData(DeliveryAnnotations.class, deliveryAnnotationsForSendBuffer);
}
}

View File

@ -754,12 +754,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference reference) {
ensureDataIsValid();
DeliveryAnnotations daToWrite;
DeliveryAnnotations daToWrite = reference != null ? reference.getProtocolData(DeliveryAnnotations.class) : null;
if (reference != null && reference.getProtocolData() instanceof DeliveryAnnotations) {
daToWrite = (DeliveryAnnotations) reference.getProtocolData();
} else {
// deliveryAnnotationsForSendBuffer was an old API form where a deliver could set it before deliver
if (reference == null) {
// deliveryAnnotationsForSendBuffer is part of an older API, deprecated but still present
daToWrite = deliveryAnnotationsForSendBuffer;
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
/** this will be used when there are multiple replicas in use. */
public class AMQPMirrorControllerAggregation implements MirrorController, ActiveMQComponent {
@ -72,6 +73,13 @@ public class AMQPMirrorControllerAggregation implements MirrorController, Active
return partitions;
}
@Override
public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception {
for (MirrorController partition : partitions) {
partition.preAcknowledge(tx, ref, reason);
}
}
@Override
public void addAddress(AddressInfo addressInfo) throws Exception {
for (MirrorController partition : partitions) {
@ -102,9 +110,9 @@ public class AMQPMirrorControllerAggregation implements MirrorController, Active
}
@Override
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
for (MirrorController partition : partitions) {
partition.sendMessage(message, context, refs);
partition.sendMessage(tx, message, context);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -24,6 +25,9 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -34,6 +38,9 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
@ -86,6 +93,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
final boolean deleteQueues;
final MirrorAddressFilter addressFilter;
private final AMQPBrokerConnection brokerConnection;
private final boolean sync;
final AMQPMirrorBrokerConnectionElement replicaConfig;
@ -116,6 +124,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
this.addressFilter = new MirrorAddressFilter(replicaConfig.getAddressFilter());
this.acks = replicaConfig.isMessageAcknowledgements();
this.brokerConnection = brokerConnection;
this.sync = replicaConfig.isSync();
}
public Queue getSnfQueue() {
@ -216,60 +225,120 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
}
@Override
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
SimpleString address = context.getAddress(message);
if (invalidTarget(context.getMirrorSource())) {
logger.trace("server {} is discarding send to avoid infinite loop (reflection with the mirror)", server);
logger.trace("sendMessage::server {} is discarding send to avoid infinite loop (reflection with the mirror)", server);
return;
}
if (context.isInternal()) {
logger.trace("server {} is discarding send to avoid sending to internal queue", server);
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
return;
}
if (ignoreAddress(address)) {
logger.trace("server {} is discarding send to address {}, address doesn't match filter", server, address);
logger.trace("sendMessage::server {} is discarding send to address {}, address doesn't match filter", server, address);
return;
}
logger.trace("{} send message {}", server, message);
logger.trace("sendMessage::{} send message {}", server, message);
try {
context.setReusable(false);
MessageReference ref = MessageReference.Factory.createReference(message, snfQueue);
String nodeID = setProtocolData(idSupplier, ref);
String nodeID = idSupplier.getServerID(message);
if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
logger.trace("Message {} already belonged to the node, {}, it won't circle send", message, getRemoteMirrorId());
logger.trace("sendMessage::Message {} already belonged to the node, {}, it won't circle send", message, getRemoteMirrorId());
return;
}
MessageReference ref = MessageReference.Factory.createReference(message, snfQueue);
setProtocolData(ref, nodeID, idSupplier.getID(ref));
snfQueue.refUp(ref);
refs.add(ref);
if (tx != null) {
logger.debug("sendMessage::Mirroring Message {} with TX", message);
getSendOperation(tx).addRef(ref);
} // if non transactional the afterStoreOperations will use the ref directly and call processReferences
if (sync) {
OperationContext operContext = OperationContextImpl.getContext(server.getExecutorFactory());
if (tx == null) {
// notice that if transactional, the context is lined up on beforeCommit as part of the transaction operation
operContext.replicationLineUp();
}
if (logger.isDebugEnabled()) {
logger.debug("sendMessage::mirror syncUp context={}, ref={}", operContext, ref);
}
ref.setProtocolData(OperationContext.class, operContext);
}
if (message.isDurable() && snfQueue.isDurable()) {
PostOfficeImpl.storeDurableReference(server.getStorageManager(), message, context.getTransaction(), snfQueue, true);
}
if (tx == null) {
server.getStorageManager().afterStoreOperations(new IOCallback() {
@Override
public void done() {
PostOfficeImpl.processReference(ref, false);
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
private void syncDone(MessageReference reference) {
OperationContext ctx = reference.getProtocolData(OperationContext.class);
if (ctx != null) {
ctx.replicationDone();
logger.debug("syncDone::replicationDone::ctx={},ref={}", ctx, reference);
} else {
Message message = reference.getMessage();
if (message != null) {
ctx = (OperationContext) message.getUserContext(OperationContext.class);
if (ctx != null) {
ctx.replicationDone();
logger.debug("syncDone::replicationDone message={}", message);
} else {
logger.trace("syncDone::No operationContext set on message {}", message);
}
} else {
logger.debug("syncDone::no message set on reference {}", reference);
}
}
}
public static void validateProtocolData(ReferenceNodeStore referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
if (ref.getProtocolData() == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
if (ref.getProtocolData(DeliveryAnnotations.class) == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
setProtocolData(referenceIDSupplier, ref);
}
}
/** This method will return the brokerID used by the message */
private static String setProtocolData(ReferenceNodeStore referenceIDSupplier, MessageReference ref) {
String brokerID = referenceIDSupplier.getServerID(ref);
long id = referenceIDSupplier.getID(ref);
setProtocolData(ref, brokerID, id);
return brokerID;
}
private static void setProtocolData(MessageReference ref, String brokerID, long id) {
Map<Symbol, Object> daMap = new HashMap<>();
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
String brokerID = referenceIDSupplier.getServerID(ref);
// getListID will return null when the message was generated on this broker.
// on this case we do not send the brokerID, and the ControllerTarget will get the information from the link.
// this is just to safe a few bytes and some processing on the wire.
@ -278,8 +347,6 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
daMap.put(BROKER_ID, brokerID);
}
long id = referenceIDSupplier.getID(ref);
daMap.put(INTERNAL_ID, id);
String address = ref.getMessage().getAddress();
if (address != null) { // this is the message that was set through routing
@ -290,9 +357,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
}
}
ref.setProtocolData(deliveryAnnotations);
return brokerID;
ref.setProtocolData(DeliveryAnnotations.class, deliveryAnnotations);
}
private static Properties getProperties(Message message) {
@ -303,12 +368,30 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
}
}
private void postACKInternalMessage(MessageReference reference) {
logger.debug("postACKInternalMessage::server={}, ref={}", server, reference);
if (sync) {
syncDone(reference);
}
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) throws Exception {
if (!acks || ref.getQueue().isMirrorController()) {
postACKInternalMessage(ref);
return;
}
}
@Override
public void preAcknowledge(final Transaction tx, final MessageReference ref, final AckReason reason) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("postACKInternalMessage::tx={}, ref={}, reason={}", tx, ref, reason);
}
MirrorController controllerInUse = getControllerInUse();
if (!acks || ref.getQueue().isMirrorController()) { // we don't call postACK on snfqueues, otherwise we would get infinite loop because of this feedback/
if (!acks || ref.getQueue().isMirrorController()) { // we don't call preAcknowledge on snfqueues, otherwise we would get infinite loop because of this feedback/
return;
}
@ -318,28 +401,192 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController()))) {
if (logger.isDebugEnabled()) {
logger.debug("{} rejecting postAcknowledge queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, ref.getQueue().getName(), ref);
logger.debug("preAcknowledge::{} rejecting preAcknowledge queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, ref.getQueue().getName(), ref);
}
return;
}
if (ignoreAddress(ref.getQueue().getAddress())) {
if (logger.isTraceEnabled()) {
logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(), ref);
logger.trace("preAcknowledge::{} rejecting preAcknowledge queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(), ref);
}
return;
}
logger.trace("{} postAcknowledge {}", server, ref);
logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
if (logger.isTraceEnabled()) {
logger.trace("{} sending ack message from server {} with messageID={}", server, nodeID, internalID);
Message messageCommand = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
if (sync) {
OperationContext operationContext;
operationContext = OperationContextImpl.getContext(server.getExecutorFactory());
messageCommand.setUserContext(OperationContext.class, operationContext);
if (tx == null) {
// notice that if transactional, the context is lined up on beforeCommit as part of the transaction operation
operationContext.replicationLineUp();
}
}
if (tx != null) {
MirrorACKOperation operation = getAckOperation(tx);
// notice the operationContext.replicationLineUp is done on beforeCommit as part of the TX
operation.addMessage(messageCommand, ref);
} else {
server.getStorageManager().afterStoreOperations(new IOCallback() {
@Override
public void done() {
try {
logger.debug("preAcknowledge::afterStoreOperation for messageReference {}", ref);
route(server, messageCommand);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
}
}
private MirrorACKOperation getAckOperation(Transaction tx) {
MirrorACKOperation ackOperation = (MirrorACKOperation) tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
if (ackOperation == null) {
logger.trace("getAckOperation::setting operation on transaction {}", tx);
ackOperation = new MirrorACKOperation(server);
tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, ackOperation);
tx.afterStore(ackOperation);
}
return ackOperation;
}
private MirrorSendOperation getSendOperation(Transaction tx) {
if (tx == null) {
return null;
}
MirrorSendOperation sendOperation = (MirrorSendOperation) tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
if (sendOperation == null) {
logger.trace("getSendOperation::setting operation on transaction {}", tx);
sendOperation = new MirrorSendOperation();
tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, sendOperation);
tx.afterStore(sendOperation);
}
return sendOperation;
}
private static class MirrorACKOperation extends TransactionOperationAbstract {
final ActiveMQServer server;
// This map contains the Message used to generate the command towards the target, the reference being acked
final HashMap<Message, MessageReference> acks = new HashMap<>();
MirrorACKOperation(ActiveMQServer server) {
this.server = server;
}
/**
*
* @param message the message with the instruction to ack on the target node. Notice this is not the message owned by the reference.
* @param ref the reference being acked
*/
public void addMessage(Message message, MessageReference ref) {
acks.put(message, ref);
}
@Override
public void beforeCommit(Transaction tx) {
logger.debug("MirrorACKOperation::beforeCommit processing {}", acks);
acks.forEach(this::doBeforeCommit);
}
// callback to be used on forEach
private void doBeforeCommit(Message ack, MessageReference ref) {
OperationContext context = (OperationContext) ack.getUserContext(OperationContext.class);
if (context != null) {
context.replicationLineUp();
}
}
@Override
public void afterCommit(Transaction tx) {
logger.debug("MirrorACKOperation::afterCommit processing {}", acks);
acks.forEach(this::doAfterCommit);
}
// callback to be used on forEach
private void doAfterCommit(Message ack, MessageReference ref) {
try {
route(server, ack);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
ref.getMessage().usageDown();
}
@Override
public void afterRollback(Transaction tx) {
acks.forEach(this::doAfterRollback);
}
// callback to be used on forEach
private void doAfterRollback(Message ack, MessageReference ref) {
OperationContext context = (OperationContext) ack.getUserContext(OperationContext.class);
if (context != null) {
context.replicationDone();
}
}
}
private static final class MirrorSendOperation extends TransactionOperationAbstract {
final List<MessageReference> refs = new ArrayList<>();
public void addRef(MessageReference ref) {
refs.add(ref);
}
@Override
public void beforeCommit(Transaction tx) {
refs.forEach(this::doBeforeCommit);
}
// callback to be used on forEach
private void doBeforeCommit(MessageReference ref) {
OperationContext context = ref.getProtocolData(OperationContext.class);
if (context != null) {
context.replicationLineUp();
}
}
@Override
public void afterRollback(Transaction tx) {
logger.debug("MirrorSendOperation::afterRollback, refs:{}", refs);
refs.forEach(this::doBeforeRollback);
}
// forEach callback
private void doBeforeRollback(MessageReference ref) {
OperationContext localCTX = ref.getProtocolData(OperationContext.class);
if (localCTX != null) {
localCTX.replicationDone();
}
}
@Override
public void afterCommit(Transaction tx) {
logger.debug("MirrorSendOperation::afterCommit refs:{}", refs);
refs.forEach(this::doAfterCommit);
}
// forEach callback
private void doAfterCommit(MessageReference ref) {
PostOfficeImpl.processReference(ref, false);
}
Message message = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
route(server, message);
ref.getMessage().usageDown();
}
private Message createMessage(SimpleString address, SimpleString queue, Object event, String brokerID, Object body) {

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.util.List;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;
@ -279,6 +278,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
return AddressInfo.fromJSON(body);
}
@Override
public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception {
// NO-OP
}
@Override
public void addAddress(AddressInfo addressInfo) throws Exception {
logger.debug("{} adding address {}", server, addressInfo);
@ -359,11 +363,12 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) {
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}", nodeID, messageID, targetQueue.getName());
}
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}). Ref={}", nodeID, messageID, targetQueue.getName(), reference);
}
if (reference == null) {
if (logger.isDebugEnabled()) {
logger.debug("Retrying Reference not found on messageID={}, nodeID={}, currentRetry={}", messageID, nodeID, retry);
@ -490,7 +495,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
@Override
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
// Do nothing
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.util.HashMap;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.utils.collections.NodeStore;
@ -112,7 +113,12 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
}
public String getServerID(MessageReference element) {
Object nodeID = element.getMessage().getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
return getServerID(element.getMessage());
}
public String getServerID(Message message) {
Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
if (nodeID != null) {
return nodeID.toString();
} else {
@ -124,7 +130,8 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
}
public long getID(MessageReference element) {
Long id = (Long) element.getMessage().getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
Message message = element.getMessage();
Long id = getID(message);
if (id == null) {
return element.getMessageID();
} else {
@ -132,6 +139,10 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
}
}
private Long getID(Message message) {
return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
}
@Override
public synchronized void clear() {
lists.forEach((k, v) -> v.clear());

View File

@ -683,14 +683,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
frameBuffer.clear();
DeliveryAnnotations deliveryAnnotationsToEncode;
message.checkReference(reference);
if (reference.getProtocolData() != null && reference.getProtocolData() instanceof DeliveryAnnotations) {
deliveryAnnotationsToEncode = (DeliveryAnnotations)reference.getProtocolData();
} else {
deliveryAnnotationsToEncode = null;
}
DeliveryAnnotations deliveryAnnotationsToEncode = reference.getProtocolData(DeliveryAnnotations.class);
try {
replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(frameBuffer));

View File

@ -295,7 +295,7 @@ public class AMQConsumer {
//handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can share the session.wireFormat()
dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this, session.getCoreServer().getNodeManager().getUUID());
int size = dispatch.getMessage().getSize();
reference.setProtocolData(dispatch.getMessage().getMessageId());
reference.setProtocolData(MessageId.class, dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
currentWindow.decrementAndGet();
return size;
@ -337,7 +337,7 @@ public class AMQConsumer {
// if it's browse only, nothing to be acked
final boolean removeReferences = !serverConsumer.isBrowseOnly() && !serverConsumer.getQueue().isNonDestructive();
final List<MessageReference> ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData()));
final List<MessageReference> ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData(MessageId.class)), reference -> lastID.equals(reference.getProtocolData(MessageId.class)));
if (!ackList.isEmpty() || !removeReferences || serverConsumer.getQueue().isTemporary()) {

View File

@ -28,6 +28,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme
boolean messageAcknowledgements = true;
boolean sync = false;
SimpleString mirrorSNF;
String addressFilter;
@ -98,4 +100,12 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme
return this;
}
public boolean isSync() {
return sync;
}
public AMQPMirrorBrokerConnectionElement setSync(boolean sync) {
this.sync = sync;
return this;
}
}

View File

@ -2121,10 +2121,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
boolean queueCreation = getBooleanAttribute(e2,"queue-creation", true);
boolean durable = getBooleanAttribute(e2, "durable", true);
boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true);
boolean sync = getBooleanAttribute(e2, "sync", false);
String addressFilter = getAttributeValue(e2, "address-filter");
AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement();
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter);
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync);
connectionElement = amqpMirrorConnectionElement;
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);
} else {

View File

@ -26,14 +26,14 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AbstractProtocolReference;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> implements PagedReference, Runnable {
public class PagedReferenceImpl extends AbstractProtocolReference implements PagedReference, Runnable {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -75,8 +75,6 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
private boolean alreadyAcked;
private Object protocolData;
//0 is false, 1 is true, 2 not defined
private static final byte IS_NOT_LARGE_MESSAGE = 0;
private static final byte IS_LARGE_MESSAGE = 1;
@ -97,16 +95,6 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
private static final byte UNDEFINED_IS_DURABLE = -1;
private byte durable = UNDEFINED_IS_DURABLE;
@Override
public Object getProtocolData() {
return protocolData;
}
@Override
public void setProtocolData(Object protocolData) {
this.protocolData = protocolData;
}
@Override
public Message getMessage() {
return getPagedMessage().getMessage();

View File

@ -177,8 +177,12 @@ public class OperationContextImpl implements OperationContext {
}
} else {
if (storeOnly) {
assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined));
if (storeLined == stored && EXECUTORS_PENDING_UPDATER.get(this) == 0) {
executeNow = true;
} else {
assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined));
}
} else {
// ensure total ordering
assert validateTasksAdd(storeLined, replicationLined, pageLined);

View File

@ -224,4 +224,8 @@ public interface PostOffice extends ActiveMQComponent {
default AddressManager getAddressManager() {
return null;
}
default void preAcknowledge(final Transaction tx, final MessageReference ref, AckReason reason) {
}
}

View File

@ -246,6 +246,18 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return this;
}
@Override
public void preAcknowledge(final Transaction tx, final MessageReference ref, AckReason reason) {
if (mirrorControllerSource != null && reason != AckReason.REPLACED) { // we don't send replacements on LVQ as they are replaced themselves on the target
try {
mirrorControllerSource.preAcknowledge(tx, ref, reason);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {
if (mirrorControllerSource != null && reason != AckReason.REPLACED) { // we don't send replacements on LVQ as they are replaced themselves on the target
@ -1624,7 +1636,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (mirrorControllerSource != null && !context.isMirrorDisabled()) {
// we check for isMirrorDisabled as to avoid recursive loop from there
mirrorControllerSource.sendMessage(message, context, refs);
mirrorControllerSource.sendMessage(tx, message, context);
}
@ -1647,10 +1659,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
private static void processReferences(List<MessageReference> refs, boolean direct) {
for (MessageReference ref : refs) {
ref.getQueue().addTail(ref, direct);
}
public static void processReferences(List<MessageReference> refs, boolean direct) {
refs.forEach((ref) -> processReference(ref, direct));
}
public static void processReference(MessageReference ref, boolean direct) {
ref.getQueue().addTail(ref, direct);
}
private void processRouteToDurableQueues(final Message message,

View File

@ -74,13 +74,13 @@ public interface MessageReference {
* To be used on holding protocol specific data during the delivery.
* This will be only valid while the message is on the delivering queue at the consumer
*/
Object getProtocolData();
<T> T getProtocolData(Class<T> typeClass);
/**
* To be used on holding protocol specific data during the delivery.
* This will be only valid while the message is on the delivering queue at the consumer
*/
void setProtocolData(Object data);
<T> void setProtocolData(Class<T> typeClass, T data);
MessageReference copy(Queue queue);

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.HashMap;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
/** I need to store protocol specific data on the references. The same need exists in both PagedReference and MessageReferenceImpl.
* This class will serve the purpose to keep the specific protocol data for either reference.
* */
public abstract class AbstractProtocolReference extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference {
private HashMap<Class, Object> protocolDataMap;
@Override
public <T> T getProtocolData(Class<T> classType) {
if (protocolDataMap == null) {
return null;
} else {
return (T)protocolDataMap.get(classType);
}
}
@Override
public <T> void setProtocolData(Class<T> classType, T protocolData) {
if (protocolDataMap == null) {
protocolDataMap = new HashMap<>();
}
protocolDataMap.put(classType, protocolData);
}
}

View File

@ -89,13 +89,13 @@ public class GroupFirstMessageReference implements MessageReference {
}
@Override
public Object getProtocolData() {
return messageReference.getProtocolData();
public <T> T getProtocolData(Class<T> typeClass) {
return messageReference.getProtocolData(typeClass);
}
@Override
public void setProtocolData(Object data) {
messageReference.setProtocolData(data);
public <T> void setProtocolData(Class<T> typeClass, T data) {
messageReference.setProtocolData(typeClass, data);
}
@Override

View File

@ -27,12 +27,11 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
/**
* Implementation of a MessageReference
*/
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
public class MessageReferenceImpl extends AbstractProtocolReference implements MessageReference, Runnable {
private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();
@ -78,8 +77,6 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
private boolean deliveredDirectly;
private Object protocolData;
private Consumer<? super MessageReference> onDelivery;
@ -138,15 +135,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
}
}
@Override
public Object getProtocolData() {
return protocolData;
}
@Override
public void setProtocolData(Object protocolData) {
this.protocolData = protocolData;
}
/**
* @return the persistedCount

View File

@ -1877,9 +1877,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (logger.isTraceEnabled()) {
logger.trace("{} acknowledge tx={} ref={}, reason={}, consumer={}", this, transactional, ref, reason, consumer);
logger.trace("queue.acknowledge serverIdentity={}, queue={} acknowledge tx={} ref={}, reason={}, consumer={}", server.getIdentity(), this.getName(), transactional, ref, reason, consumer);
}
postOffice.preAcknowledge(tx, ref, reason);
if (nonDestructive && reason == AckReason.NORMAL) {
if (transactional) {
refsOperation.addOnlyRefAck(ref);

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.mirror;
import java.util.List;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -25,7 +23,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
/**
* This represents the contract we will use to send messages to replicas.
@ -35,9 +33,10 @@ public interface MirrorController {
void deleteAddress(AddressInfo addressInfo) throws Exception;
void createQueue(QueueConfiguration queueConfiguration) throws Exception;
void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception;
void sendMessage(Message message, RoutingContext context, List<MessageReference> refs);
void sendMessage(Transaction tx, Message message, RoutingContext context);
void postAcknowledge(MessageReference ref, AckReason reason) throws Exception;
void preAcknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception;
String getRemoteMirrorId();
}

View File

@ -37,4 +37,8 @@ public class TransactionPropertyIndexes {
public static final int EXPIRY_LOGGER = 9;
public static final int CONSUMER_METRICS_OPERATION = 10;
public static final int MIRROR_ACK_OPERATION = 11;
public static final int MIRROR_SEND_OPERATION = 12;
}

View File

@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.transaction.impl;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import io.netty.util.collection.IntObjectHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@ -50,7 +50,7 @@ public class TransactionImpl implements Transaction {
private static final int INITIAL_NUM_PROPERTIES = 11;
private Object[] properties = null;
private IntObjectHashMap properties = null;
protected final StorageManager storageManager;
@ -72,22 +72,6 @@ public class TransactionImpl implements Transaction {
private Object protocolData;
private void ensurePropertiesCapacity(int capacity) {
if (properties != null && properties.length >= capacity) {
return;
}
createOrEnlargeProperties(capacity);
}
private void createOrEnlargeProperties(int capacity) {
if (properties == null) {
properties = new Object[Math.min(TransactionImpl.INITIAL_NUM_PROPERTIES, capacity)];
} else {
assert properties.length < capacity;
properties = Arrays.copyOf(properties, capacity);
}
}
@Override
public Object getProtocolData() {
return protocolData;
@ -529,14 +513,17 @@ public class TransactionImpl implements Transaction {
@Override
public void putProperty(final int index, final Object property) {
ensurePropertiesCapacity(index + 1);
properties[index] = property;
if (properties == null) {
properties = new IntObjectHashMap();
}
properties.put(index, property);
}
@Override
public Object getProperty(final int index) {
return properties == null ? null : (index < properties.length ? properties[index] : null);
return properties == null ? null : properties.get(index);
}
// Private

View File

@ -2447,6 +2447,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="sync" type="xsd:boolean" use="optional" default="false">
<xsd:annotation>
<xsd:documentation>
If this is true, client blocking operations will be waiting a response from the mirror before the unblocking the operation.
This is false by default.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="address-filter" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>

View File

@ -709,6 +709,11 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
@Test
public void testAMQPConnectionsConfiguration() throws Throwable {
testAMQPConnectionsConfiguration(true);
testAMQPConnectionsConfiguration(false);
}
private void testAMQPConnectionsConfiguration(boolean sync) throws Throwable {
ConfigurationImpl configuration = new ConfigurationImpl();
Properties insertionOrderedProperties = new ConfigurationImpl.InsertionOrderedProperties();
@ -723,6 +728,9 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.queueCreation", "true");
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.queueRemoval", "true");
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.addressFilter", "foo");
if (sync) {
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.sync", "true");
} // else we just use the default that is false
configuration.parsePrefixedProperties(insertionOrderedProperties, null);
@ -742,6 +750,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
Assert.assertEquals(true, amqpMirrorBrokerConnectionElement.isMessageAcknowledgements());
Assert.assertEquals(true, amqpMirrorBrokerConnectionElement.isQueueCreation());
Assert.assertEquals(true, amqpMirrorBrokerConnectionElement.isQueueRemoval());
Assert.assertEquals(sync, ((AMQPMirrorBrokerConnectionElement) amqpBrokerConnectionElement).isSync());
Assert.assertEquals("foo", amqpMirrorBrokerConnectionElement.getAddressFilter());
}

View File

@ -442,7 +442,7 @@
<receiver address-match="TEST-RECEIVER" />
<peer address-match="TEST-PEER"/>
<receiver queue-name="TEST-WITH-QUEUE-NAME"/>
<mirror message-acknowledgements="false" queue-creation="false" durable="false" queue-removal="false" address-filter="TEST-QUEUE,!IGNORE-QUEUE"/>
<mirror message-acknowledgements="false" queue-creation="false" durable="false" queue-removal="false" address-filter="TEST-QUEUE,!IGNORE-QUEUE" sync="true"/>
</amqp-connection>
<amqp-connection uri="tcp://test2:222" name="test2">
<mirror durable="false"/>

View File

@ -81,8 +81,7 @@ The previous example portrays a case of connection failure towards ServerA. The
<div style="page-break-after: always"></div>
## Mirroring
The idea of mirroring is to send events that happen on a broker towards another broker, without blocking any operations from producers and consumers, allowing them to keep operating as fast as possible.
It can be used for Disaster Recovery, and depending on the requirements even for failing over the data.
Mirroring will reproduce any operation that happened on the source brokers towards a target broker.
The following events are sent through mirroring:
@ -94,6 +93,8 @@ The following events are sent through mirroring:
* Queue and address creation.
* Queue and address deletion.
By default every operation is sent asynchronously without blocking any clients. However if you set sync="true" on the mirror configuration, the clients will always wait a mirror on every blocking operation.
### Mirror configuration
Add a `<mirror>` element within the `<amqp-connection>` element to configure mirroring to the target broker.
@ -119,9 +120,10 @@ The following optional arguments can be utilized:
matches all addresses starting with 'eu' but not those starting with 'eu.uk'
**Note:**
- Address exclusion will always take precedence over address inclusion.
- Address matching on mirror elements is prefix-based and does not support wild-card matching.
* `sync`: By default is false. If set it to true any client blocking operation will be held until the mirror as confirmed receiving the operation.
* Notice that a disconnected node would hold all operations from the client. If you set sync=true you must reconnect a mirror before performing any operations.
An example of a mirror configuration is shown below:
```xml

View File

@ -77,7 +77,7 @@ public class AmqpReferenceDeliveryAnnotationTest extends AmqpClientTestSupport {
Map<Symbol, Object> symbolObjectMap = new HashMap<>();
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(symbolObjectMap);
symbolObjectMap.put(Symbol.getSymbol("KEY"), uuid);
reference.setProtocolData(deliveryAnnotations);
reference.setProtocolData(DeliveryAnnotations.class, deliveryAnnotations);
}
});

View File

@ -681,9 +681,11 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
server_2.getConfiguration().setName("thisone");
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(acks);
AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(acks).setDurable(true);
replica.setName("theReplica");
amqpConnection.addElement(replica);
server_2.getConfiguration().addAMQPConnection(amqpConnection);
server_2.getConfiguration().setName("server_2");
int NUMBER_OF_MESSAGES = 200;
@ -698,7 +700,6 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
if (!deferredStart) {
Queue queueOnServer1 = locateQueue(server, getQueueName());

View File

@ -0,0 +1,474 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
private static final String SLOW_SERVER_NAME = "slow";
private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
private ActiveMQServer slowServer;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Test
public void testPersistedSendAMQP() throws Exception {
testPersistedSend("AMQP", false, 100);
}
@Test
public void testPersistedSendAMQPLarge() throws Exception {
testPersistedSend("AMQP", false, 200 * 1024);
}
@Test
public void testPersistedSendCore() throws Exception {
testPersistedSend("CORE", false, 100);
}
@Test
public void testPersistedSendCoreLarge() throws Exception {
testPersistedSend("CORE", false, 200 * 1024);
}
@Test
public void testPersistedSendAMQPTXLarge() throws Exception {
testPersistedSend("AMQP", true, 200 * 1024);
}
@Test
public void testPersistedSendAMQPTX() throws Exception {
testPersistedSend("AMQP", true, 100);
}
@Test
public void testPersistedSendCoreTX() throws Exception {
testPersistedSend("CORE", true, 100);
}
@Test
public void testPersistedSendCoreTXLarge() throws Exception {
testPersistedSend("CORE", true, 200 * 1024);
}
private void testPersistedSend(String protocol, boolean transactional, int messageSize) throws Exception {
ReusableLatch sendPending = new ReusableLatch(0);
Semaphore semSend = new Semaphore(1);
Semaphore semAck = new Semaphore(1);
AtomicInteger errors = new AtomicInteger(0);
try {
final int NUMBER_OF_MESSAGES = 10;
AtomicInteger countStored = new AtomicInteger(0);
slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
if (logger.isDebugEnabled()) {
logger.debug("StorageCallback::slow isUpdate={}, isTX={}, txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, record);
}
if (transactional) {
if (isTX) {
try {
if (countStored.get() > 0) {
countStored.incrementAndGet();
logger.debug("semSend.tryAcquire");
if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
logger.debug("acquired TX, now release");
semSend.release();
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
logger.debug("slow ACK REF");
try {
if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
semAck.release();
logger.debug("slow acquired ACK semaphore");
} else {
logger.debug("Semaphore wasn't acquired");
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
try {
countStored.incrementAndGet();
if (!transactional) {
logger.debug("semSend.tryAcquire");
if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
logger.debug("acquired non TX now release");
semSend.release();
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
}
}
});
slowServer.setIdentity("slowServer");
server.setIdentity("server");
ExecutorService pool = Executors.newFixedThreadPool(5);
runAfter(pool::shutdown);
configureMirrorTowardsSlow(server);
slowServer.getConfiguration().setName("slow");
server.getConfiguration().setName("fast");
slowServer.start();
server.start();
waitForServerToStart(slowServer);
waitForServerToStart(server);
server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
Queue replicatedQueue = slowServer.locateQueue(getQueueName());
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + AMQP_PORT);
if (factory instanceof ActiveMQConnectionFactory) {
((ActiveMQConnectionFactory) factory).getServerLocator().setBlockOnAcknowledge(true);
}
Connection connection = factory.createConnection();
runAfter(connection::close);
Session session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
connection.start();
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
final String bodyMessage;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < messageSize; i++) {
buffer.append("large Buffer...");
}
bodyMessage = buffer.toString();
}
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
logger.debug("===>>> send message {}", i);
int theI = i;
sendPending.countUp();
logger.debug("semSend.acquire");
semSend.acquire();
if (!transactional) {
pool.execute(() -> {
try {
logger.debug("Entering non TX send with sendPending = {}", sendPending.getCount());
TextMessage message = session.createTextMessage(bodyMessage);
message.setStringProperty("strProperty", "" + theI);
producer.send(message);
sendPending.countDown();
logger.debug("leaving non TX send with sendPending = {}", sendPending.getCount());
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
}
});
} else {
CountDownLatch sendDone = new CountDownLatch(1);
pool.execute(() -> {
try {
TextMessage message = session.createTextMessage(bodyMessage);
message.setStringProperty("strProperty", "" + theI);
producer.send(message);
} catch (Throwable e) {
errors.incrementAndGet();
logger.warn(e.getMessage(), e);
}
sendDone.countDown();
});
Wait.assertEquals(i, replicatedQueue::getMessageCount);
Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
pool.execute(() -> {
try {
session.commit();
sendPending.countDown();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
});
}
Assert.assertFalse("sendPending.await() not supposed to succeed", sendPending.await(10, TimeUnit.MILLISECONDS));
logger.debug("semSend.release");
semSend.release();
Assert.assertTrue(sendPending.await(10, TimeUnit.SECONDS));
Wait.assertEquals(i + 1, replicatedQueue::getMessageCount);
}
if (!transactional) {
Wait.assertEquals(NUMBER_OF_MESSAGES, countStored::get);
}
Wait.assertEquals(NUMBER_OF_MESSAGES, replicatedQueue::getMessageCount);
connection.start();
Session clientSession = transactional ? connection.createSession(true, Session.AUTO_ACKNOWLEDGE) : connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = clientSession.createConsumer(clientSession.createQueue(getQueueName()));
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
logger.debug("===<<< Receiving message {}", i);
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
semAck.acquire();
CountDownLatch countDownLatch = new CountDownLatch(1);
pool.execute(() -> {
try {
if (transactional) {
clientSession.commit();
} else {
message.acknowledge();
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
countDownLatch.countDown();
}
});
if (!transactional && protocol.equals("AMQP")) {
// non transactional ack in AMQP is always async. No need to verify anything else here
logger.debug("non transactional and amqp is always asynchronous. No need to verify anything");
} else {
Assert.assertFalse(countDownLatch.await(10, TimeUnit.MILLISECONDS));
}
semAck.release();
Assert.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
Wait.assertEquals(NUMBER_OF_MESSAGES - i - 1, replicatedQueue::getMessageCount);
}
Assert.assertEquals(0, errors.get());
} finally {
semAck.release();
semSend.release();
}
}
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = createServerWithCallbackStorage(AMQP_PORT, "fastServer", (isUpdate, isTX, txId, id, recordType, persister, record) -> {
if (logger.isDebugEnabled()) {
logger.debug("StorageCallback::fast isUpdate={}, isTX={}, txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, record);
}
});
addServer(server);
return server;
}
private void configureMirrorTowardsSlow(ActiveMQServer source) {
AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:" + SLOW_SERVER_PORT).setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(true).setMessageAcknowledgements(true);
connection.addElement(replication);
source.getConfiguration().addAMQPConnection(connection);
}
private ActiveMQServer createServerWithCallbackStorage(int port, String name, StorageCallback storageCallback) throws Exception {
ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
ActiveMQServer server = new ActiveMQServerImpl(createBasicConfig(port), mBeanServer, securityManager) {
@Override
protected StorageManager createStorageManager() {
return AMQPSyncMirrorTest.this.createCallbackStorageManager(getConfiguration(), getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener, storageCallback);
}
};
server.getConfiguration().setName(name);
server.getConfiguration().getAcceptorConfigurations().clear();
server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(slowServer, port));
server.getConfiguration().setMessageExpiryScanPeriod(-1);
server.getConfiguration().setJMXManagementEnabled(true);
configureAddressPolicy(server);
configureBrokerSecurity(server);
addServer(server);
return server;
}
private interface StorageCallback {
void storage(boolean isUpdate,
boolean isCommit,
long txID,
long id,
byte recordType,
Persister persister,
Object record);
}
private StorageManager createCallbackStorageManager(Configuration configuration,
CriticalAnalyzer criticalAnalyzer,
ExecutorFactory executorFactory,
ScheduledExecutorService scheduledPool,
ExecutorFactory ioExecutorFactory,
IOCriticalErrorListener ioCriticalErrorListener,
StorageCallback storageCallback) {
return new JournalStorageManager(configuration, criticalAnalyzer, executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
@Override
protected Journal createMessageJournal(Configuration config,
IOCriticalErrorListener criticalErrorListener,
int fileSize) {
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()) {
@Override
public void appendAddRecordTransactional(long txID,
long id,
byte recordType,
Persister persister,
Object record) throws Exception {
storageCallback.storage(false, false, txID, id, recordType, persister, record);
super.appendAddRecordTransactional(txID, id, recordType, persister, record);
}
@Override
public void appendAddRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
IOCompletion callback) throws Exception {
storageCallback.storage(false, false, -1, id, recordType, persister, record);
super.appendAddRecord(id, recordType, persister, record, sync, callback);
}
@Override
public void appendUpdateRecord(long id,
byte recordType,
EncodingSupport record,
boolean sync) throws Exception {
storageCallback.storage(true, false, -1, id, recordType, null, record);
super.appendUpdateRecord(id, recordType, record, sync);
}
@Override
public void appendUpdateRecordTransactional(long txID,
long id,
byte recordType,
EncodingSupport record) throws Exception {
storageCallback.storage(true, false, txID, id, recordType, null, record);
super.appendUpdateRecordTransactional(txID, id, recordType, record);
}
@Override
public void appendCommitRecord(long txID,
boolean sync,
IOCompletion callback,
boolean lineUpContext) throws Exception {
storageCallback.storage(false, true, txID, txID, (byte)0, null, null);
super.appendCommitRecord(txID, sync, callback, lineUpContext);
}
@Override
public void tryAppendUpdateRecord(long id,
byte recordType,
Persister persister,
Object record,
boolean sync,
boolean replaceableUpdate,
JournalUpdateCallback updateCallback,
IOCompletion callback) throws Exception {
storageCallback.storage(true, false, -1, -1, recordType, persister, record);
super.tryAppendUpdateRecord(id, recordType, persister, record, sync, replaceableUpdate, updateCallback, callback);
}
};
}
};
}
}

View File

@ -94,6 +94,7 @@ public class ConfigurationValidationTest extends ActiveMQTestBase {
Assert.assertFalse(mirrorConnectionElement.isQueueCreation());
Assert.assertFalse(mirrorConnectionElement.isQueueRemoval());
Assert.assertFalse(mirrorConnectionElement.isDurable());
Assert.assertTrue(mirrorConnectionElement.isSync());
amqpBrokerConnectConfiguration = fc.getAMQPConnection().get(1);
@ -104,6 +105,7 @@ public class ConfigurationValidationTest extends ActiveMQTestBase {
Assert.assertFalse(mirrorConnectionElement.isDurable());
Assert.assertTrue(mirrorConnectionElement.isQueueCreation());
Assert.assertTrue(mirrorConnectionElement.isQueueRemoval());
Assert.assertFalse(mirrorConnectionElement.isSync()); // checking the default
amqpBrokerConnectConfiguration = fc.getAMQPConnection().get(2);
Assert.assertFalse(amqpBrokerConnectConfiguration.isAutostart());