From 0d3cd8d88031dbf78851969a56041e839f828bf7 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 18 Jan 2023 16:58:39 -0500 Subject: [PATCH] ARTEMIS-4136 Mirrored sync replica I am adding an option sync=true or false on mirror. if sync, any client blocking operation will wait a roundtrip to the mirror acting like a sync replica. --- .../amqp/broker/AMQPLargeMessage.java | 5 +- .../protocol/amqp/broker/AMQPMessage.java | 8 +- .../AMQPMirrorControllerAggregation.java | 12 +- .../mirror/AMQPMirrorControllerSource.java | 299 ++++++++++- .../mirror/AMQPMirrorControllerTarget.java | 15 +- .../connect/mirror/ReferenceNodeStore.java | 15 +- .../proton/ProtonServerSenderContext.java | 7 +- .../protocol/openwire/amq/AMQConsumer.java | 4 +- .../AMQPMirrorBrokerConnectionElement.java | 10 + .../impl/FileConfigurationParser.java | 3 +- .../paging/cursor/PagedReferenceImpl.java | 16 +- .../impl/journal/OperationContextImpl.java | 8 +- .../artemis/core/postoffice/PostOffice.java | 4 + .../core/postoffice/impl/PostOfficeImpl.java | 24 +- .../artemis/core/server/MessageReference.java | 4 +- .../impl/AbstractProtocolReference.java | 49 ++ .../impl/GroupFirstMessageReference.java | 8 +- .../server/impl/MessageReferenceImpl.java | 13 +- .../artemis/core/server/impl/QueueImpl.java | 4 +- .../core/server/mirror/MirrorController.java | 7 +- .../TransactionPropertyIndexes.java | 4 + .../transaction/impl/TransactionImpl.java | 29 +- .../schema/artemis-configuration.xsd | 8 + .../config/impl/ConfigurationImplTest.java | 9 + .../ConfigurationTest-full-config.xml | 2 +- .../user-manual/en/amqp-broker-connections.md | 8 +- .../AmqpReferenceDeliveryAnnotationTest.java | 2 +- .../amqp/connect/AMQPReplicaTest.java | 5 +- .../amqp/connect/AMQPSyncMirrorTest.java | 474 ++++++++++++++++++ .../impl/ConfigurationValidationTest.java | 2 + 30 files changed, 935 insertions(+), 123 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AbstractProtocolReference.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index ec86311338..e9987ed9ff 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; @@ -148,8 +149,8 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage * It was written to check the deliveryAnnotationsForSendBuffer and eventually move it to the protocolData. */ public void checkReference(MessageReference reference) { - if (reference.getProtocolData() == null && deliveryAnnotationsForSendBuffer != null) { - reference.setProtocolData(deliveryAnnotationsForSendBuffer); + if (reference.getProtocolData(DeliveryAnnotations.class) == null && deliveryAnnotationsForSendBuffer != null) { + reference.setProtocolData(DeliveryAnnotations.class, deliveryAnnotationsForSendBuffer); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index df3bdeda73..84e319cd2c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -754,12 +754,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference reference) { ensureDataIsValid(); - DeliveryAnnotations daToWrite; + DeliveryAnnotations daToWrite = reference != null ? reference.getProtocolData(DeliveryAnnotations.class) : null; - if (reference != null && reference.getProtocolData() instanceof DeliveryAnnotations) { - daToWrite = (DeliveryAnnotations) reference.getProtocolData(); - } else { - // deliveryAnnotationsForSendBuffer was an old API form where a deliver could set it before deliver + if (reference == null) { + // deliveryAnnotationsForSendBuffer is part of an older API, deprecated but still present daToWrite = deliveryAnnotationsForSendBuffer; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java index cc5fb7eafb..007f9a0d83 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.transaction.Transaction; /** this will be used when there are multiple replicas in use. */ public class AMQPMirrorControllerAggregation implements MirrorController, ActiveMQComponent { @@ -72,6 +73,13 @@ public class AMQPMirrorControllerAggregation implements MirrorController, Active return partitions; } + @Override + public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception { + for (MirrorController partition : partitions) { + partition.preAcknowledge(tx, ref, reason); + } + } + @Override public void addAddress(AddressInfo addressInfo) throws Exception { for (MirrorController partition : partitions) { @@ -102,9 +110,9 @@ public class AMQPMirrorControllerAggregation implements MirrorController, Active } @Override - public void sendMessage(Message message, RoutingContext context, List refs) { + public void sendMessage(Transaction tx, Message message, RoutingContext context) { for (MirrorController partition : partitions) { - partition.sendMessage(message, context, refs); + partition.sendMessage(tx, message, context); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 188742af2c..d9052c4623 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.protocol.amqp.connect.mirror; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -24,6 +25,9 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -34,6 +38,9 @@ import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; +import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; @@ -86,6 +93,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im final boolean deleteQueues; final MirrorAddressFilter addressFilter; private final AMQPBrokerConnection brokerConnection; + private final boolean sync; final AMQPMirrorBrokerConnectionElement replicaConfig; @@ -116,6 +124,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im this.addressFilter = new MirrorAddressFilter(replicaConfig.getAddressFilter()); this.acks = replicaConfig.isMessageAcknowledgements(); this.brokerConnection = brokerConnection; + this.sync = replicaConfig.isSync(); } public Queue getSnfQueue() { @@ -216,60 +225,120 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im } @Override - public void sendMessage(Message message, RoutingContext context, List refs) { + public void sendMessage(Transaction tx, Message message, RoutingContext context) { SimpleString address = context.getAddress(message); if (invalidTarget(context.getMirrorSource())) { - logger.trace("server {} is discarding send to avoid infinite loop (reflection with the mirror)", server); + logger.trace("sendMessage::server {} is discarding send to avoid infinite loop (reflection with the mirror)", server); return; } if (context.isInternal()) { - logger.trace("server {} is discarding send to avoid sending to internal queue", server); + logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server); return; } if (ignoreAddress(address)) { - logger.trace("server {} is discarding send to address {}, address doesn't match filter", server, address); + logger.trace("sendMessage::server {} is discarding send to address {}, address doesn't match filter", server, address); return; } - logger.trace("{} send message {}", server, message); + logger.trace("sendMessage::{} send message {}", server, message); try { context.setReusable(false); - MessageReference ref = MessageReference.Factory.createReference(message, snfQueue); - String nodeID = setProtocolData(idSupplier, ref); + String nodeID = idSupplier.getServerID(message); + if (nodeID != null && nodeID.equals(getRemoteMirrorId())) { - logger.trace("Message {} already belonged to the node, {}, it won't circle send", message, getRemoteMirrorId()); + logger.trace("sendMessage::Message {} already belonged to the node, {}, it won't circle send", message, getRemoteMirrorId()); return; } + + MessageReference ref = MessageReference.Factory.createReference(message, snfQueue); + setProtocolData(ref, nodeID, idSupplier.getID(ref)); + snfQueue.refUp(ref); - refs.add(ref); + + if (tx != null) { + logger.debug("sendMessage::Mirroring Message {} with TX", message); + getSendOperation(tx).addRef(ref); + } // if non transactional the afterStoreOperations will use the ref directly and call processReferences + + if (sync) { + OperationContext operContext = OperationContextImpl.getContext(server.getExecutorFactory()); + if (tx == null) { + // notice that if transactional, the context is lined up on beforeCommit as part of the transaction operation + operContext.replicationLineUp(); + } + if (logger.isDebugEnabled()) { + logger.debug("sendMessage::mirror syncUp context={}, ref={}", operContext, ref); + } + ref.setProtocolData(OperationContext.class, operContext); + } if (message.isDurable() && snfQueue.isDurable()) { PostOfficeImpl.storeDurableReference(server.getStorageManager(), message, context.getTransaction(), snfQueue, true); } + if (tx == null) { + server.getStorageManager().afterStoreOperations(new IOCallback() { + @Override + public void done() { + PostOfficeImpl.processReference(ref, false); + } + + @Override + public void onError(int errorCode, String errorMessage) { + } + }); + } } catch (Throwable e) { logger.warn(e.getMessage(), e); } } + private void syncDone(MessageReference reference) { + OperationContext ctx = reference.getProtocolData(OperationContext.class); + if (ctx != null) { + ctx.replicationDone(); + logger.debug("syncDone::replicationDone::ctx={},ref={}", ctx, reference); + } else { + Message message = reference.getMessage(); + if (message != null) { + ctx = (OperationContext) message.getUserContext(OperationContext.class); + if (ctx != null) { + ctx.replicationDone(); + logger.debug("syncDone::replicationDone message={}", message); + } else { + logger.trace("syncDone::No operationContext set on message {}", message); + } + } else { + logger.debug("syncDone::no message set on reference {}", reference); + } + } + } + public static void validateProtocolData(ReferenceNodeStore referenceIDSupplier, MessageReference ref, SimpleString snfAddress) { - if (ref.getProtocolData() == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) { + if (ref.getProtocolData(DeliveryAnnotations.class) == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) { setProtocolData(referenceIDSupplier, ref); } } /** This method will return the brokerID used by the message */ private static String setProtocolData(ReferenceNodeStore referenceIDSupplier, MessageReference ref) { + String brokerID = referenceIDSupplier.getServerID(ref); + long id = referenceIDSupplier.getID(ref); + + setProtocolData(ref, brokerID, id); + + return brokerID; + } + + private static void setProtocolData(MessageReference ref, String brokerID, long id) { Map daMap = new HashMap<>(); DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap); - String brokerID = referenceIDSupplier.getServerID(ref); - // getListID will return null when the message was generated on this broker. // on this case we do not send the brokerID, and the ControllerTarget will get the information from the link. // this is just to safe a few bytes and some processing on the wire. @@ -278,8 +347,6 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im daMap.put(BROKER_ID, brokerID); } - long id = referenceIDSupplier.getID(ref); - daMap.put(INTERNAL_ID, id); String address = ref.getMessage().getAddress(); if (address != null) { // this is the message that was set through routing @@ -290,9 +357,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress()); } } - ref.setProtocolData(deliveryAnnotations); - - return brokerID; + ref.setProtocolData(DeliveryAnnotations.class, deliveryAnnotations); } private static Properties getProperties(Message message) { @@ -303,12 +368,30 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im } } + private void postACKInternalMessage(MessageReference reference) { + logger.debug("postACKInternalMessage::server={}, ref={}", server, reference); + if (sync) { + syncDone(reference); + } + } + @Override public void postAcknowledge(MessageReference ref, AckReason reason) throws Exception { + if (!acks || ref.getQueue().isMirrorController()) { + postACKInternalMessage(ref); + return; + } + } + + @Override + public void preAcknowledge(final Transaction tx, final MessageReference ref, final AckReason reason) throws Exception { + if (logger.isTraceEnabled()) { + logger.trace("postACKInternalMessage::tx={}, ref={}, reason={}", tx, ref, reason); + } MirrorController controllerInUse = getControllerInUse(); - if (!acks || ref.getQueue().isMirrorController()) { // we don't call postACK on snfqueues, otherwise we would get infinite loop because of this feedback/ + if (!acks || ref.getQueue().isMirrorController()) { // we don't call preAcknowledge on snfqueues, otherwise we would get infinite loop because of this feedback/ return; } @@ -318,28 +401,192 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController()))) { if (logger.isDebugEnabled()) { - logger.debug("{} rejecting postAcknowledge queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, ref.getQueue().getName(), ref); + logger.debug("preAcknowledge::{} rejecting preAcknowledge queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, ref.getQueue().getName(), ref); } return; } if (ignoreAddress(ref.getQueue().getAddress())) { if (logger.isTraceEnabled()) { - logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(), ref); + logger.trace("preAcknowledge::{} rejecting preAcknowledge queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(), ref); } return; } - logger.trace("{} postAcknowledge {}", server, ref); + logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref); String nodeID = idSupplier.getServerID(ref); // notice the brokerID will be null for any message generated on this broker. long internalID = idSupplier.getID(ref); - if (logger.isTraceEnabled()) { - logger.trace("{} sending ack message from server {} with messageID={}", server, nodeID, internalID); + Message messageCommand = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason); + if (sync) { + OperationContext operationContext; + operationContext = OperationContextImpl.getContext(server.getExecutorFactory()); + messageCommand.setUserContext(OperationContext.class, operationContext); + if (tx == null) { + // notice that if transactional, the context is lined up on beforeCommit as part of the transaction operation + operationContext.replicationLineUp(); + } + } + + if (tx != null) { + MirrorACKOperation operation = getAckOperation(tx); + // notice the operationContext.replicationLineUp is done on beforeCommit as part of the TX + operation.addMessage(messageCommand, ref); + } else { + server.getStorageManager().afterStoreOperations(new IOCallback() { + @Override + public void done() { + try { + logger.debug("preAcknowledge::afterStoreOperation for messageReference {}", ref); + route(server, messageCommand); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + + @Override + public void onError(int errorCode, String errorMessage) { + } + }); + } + } + + private MirrorACKOperation getAckOperation(Transaction tx) { + MirrorACKOperation ackOperation = (MirrorACKOperation) tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION); + if (ackOperation == null) { + logger.trace("getAckOperation::setting operation on transaction {}", tx); + ackOperation = new MirrorACKOperation(server); + tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, ackOperation); + tx.afterStore(ackOperation); + } + + return ackOperation; + } + + private MirrorSendOperation getSendOperation(Transaction tx) { + if (tx == null) { + return null; + } + MirrorSendOperation sendOperation = (MirrorSendOperation) tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION); + if (sendOperation == null) { + logger.trace("getSendOperation::setting operation on transaction {}", tx); + sendOperation = new MirrorSendOperation(); + tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, sendOperation); + tx.afterStore(sendOperation); + } + + return sendOperation; + } + + private static class MirrorACKOperation extends TransactionOperationAbstract { + + final ActiveMQServer server; + + // This map contains the Message used to generate the command towards the target, the reference being acked + final HashMap acks = new HashMap<>(); + + MirrorACKOperation(ActiveMQServer server) { + this.server = server; + } + + /** + * + * @param message the message with the instruction to ack on the target node. Notice this is not the message owned by the reference. + * @param ref the reference being acked + */ + public void addMessage(Message message, MessageReference ref) { + acks.put(message, ref); + } + + @Override + public void beforeCommit(Transaction tx) { + logger.debug("MirrorACKOperation::beforeCommit processing {}", acks); + acks.forEach(this::doBeforeCommit); + } + + // callback to be used on forEach + private void doBeforeCommit(Message ack, MessageReference ref) { + OperationContext context = (OperationContext) ack.getUserContext(OperationContext.class); + if (context != null) { + context.replicationLineUp(); + } + } + + @Override + public void afterCommit(Transaction tx) { + logger.debug("MirrorACKOperation::afterCommit processing {}", acks); + acks.forEach(this::doAfterCommit); + } + + // callback to be used on forEach + private void doAfterCommit(Message ack, MessageReference ref) { + try { + route(server, ack); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + ref.getMessage().usageDown(); + } + + @Override + public void afterRollback(Transaction tx) { + acks.forEach(this::doAfterRollback); + } + + // callback to be used on forEach + private void doAfterRollback(Message ack, MessageReference ref) { + OperationContext context = (OperationContext) ack.getUserContext(OperationContext.class); + if (context != null) { + context.replicationDone(); + } + } + + } + + private static final class MirrorSendOperation extends TransactionOperationAbstract { + final List refs = new ArrayList<>(); + + public void addRef(MessageReference ref) { + refs.add(ref); + } + + @Override + public void beforeCommit(Transaction tx) { + refs.forEach(this::doBeforeCommit); + } + + // callback to be used on forEach + private void doBeforeCommit(MessageReference ref) { + OperationContext context = ref.getProtocolData(OperationContext.class); + if (context != null) { + context.replicationLineUp(); + } + } + + @Override + public void afterRollback(Transaction tx) { + logger.debug("MirrorSendOperation::afterRollback, refs:{}", refs); + refs.forEach(this::doBeforeRollback); + } + + // forEach callback + private void doBeforeRollback(MessageReference ref) { + OperationContext localCTX = ref.getProtocolData(OperationContext.class); + if (localCTX != null) { + localCTX.replicationDone(); + } + } + + @Override + public void afterCommit(Transaction tx) { + logger.debug("MirrorSendOperation::afterCommit refs:{}", refs); + refs.forEach(this::doAfterCommit); + } + + // forEach callback + private void doAfterCommit(MessageReference ref) { + PostOfficeImpl.processReference(ref, false); } - Message message = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason); - route(server, message); - ref.getMessage().usageDown(); } private Message createMessage(SimpleString address, SimpleString queue, Object event, String brokerID, Object body) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index 74edd0a42e..1140f5c8b4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.protocol.amqp.connect.mirror; -import java.util.List; import java.util.function.BooleanSupplier; import java.util.function.ToIntFunction; @@ -279,6 +278,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement return AddressInfo.fromJSON(body); } + @Override + public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception { + // NO-OP + } + @Override public void addAddress(AddressInfo addressInfo) throws Exception { logger.debug("{} adding address {}", server, addressInfo); @@ -359,11 +363,12 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement } private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) { - if (logger.isTraceEnabled()) { - logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}", nodeID, messageID, targetQueue.getName()); - } MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore); + if (logger.isTraceEnabled()) { + logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}). Ref={}", nodeID, messageID, targetQueue.getName(), reference); + } + if (reference == null) { if (logger.isDebugEnabled()) { logger.debug("Retrying Reference not found on messageID={}, nodeID={}, currentRetry={}", messageID, nodeID, retry); @@ -490,7 +495,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement } @Override - public void sendMessage(Message message, RoutingContext context, List refs) { + public void sendMessage(Transaction tx, Message message, RoutingContext context) { // Do nothing } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java index b0a8605fa4..d82560cace 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect.mirror; import java.util.HashMap; import io.netty.util.collection.LongObjectHashMap; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.utils.collections.NodeStore; @@ -112,7 +113,12 @@ public class ReferenceNodeStore implements NodeStore { } public String getServerID(MessageReference element) { - Object nodeID = element.getMessage().getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY); + return getServerID(element.getMessage()); + } + + + public String getServerID(Message message) { + Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY); if (nodeID != null) { return nodeID.toString(); } else { @@ -124,7 +130,8 @@ public class ReferenceNodeStore implements NodeStore { } public long getID(MessageReference element) { - Long id = (Long) element.getMessage().getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY); + Message message = element.getMessage(); + Long id = getID(message); if (id == null) { return element.getMessageID(); } else { @@ -132,6 +139,10 @@ public class ReferenceNodeStore implements NodeStore { } } + private Long getID(Message message) { + return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY); + } + @Override public synchronized void clear() { lists.forEach((k, v) -> v.clear()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 2d85abaf84..98d62c46cb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -683,14 +683,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr frameBuffer.clear(); - DeliveryAnnotations deliveryAnnotationsToEncode; message.checkReference(reference); - if (reference.getProtocolData() != null && reference.getProtocolData() instanceof DeliveryAnnotations) { - deliveryAnnotationsToEncode = (DeliveryAnnotations)reference.getProtocolData(); - } else { - deliveryAnnotationsToEncode = null; - } + DeliveryAnnotations deliveryAnnotationsToEncode = reference.getProtocolData(DeliveryAnnotations.class); try { replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(frameBuffer)); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index d064d5036c..2db8d835d0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -295,7 +295,7 @@ public class AMQConsumer { //handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can share the session.wireFormat() dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this, session.getCoreServer().getNodeManager().getUUID()); int size = dispatch.getMessage().getSize(); - reference.setProtocolData(dispatch.getMessage().getMessageId()); + reference.setProtocolData(MessageId.class, dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); currentWindow.decrementAndGet(); return size; @@ -337,7 +337,7 @@ public class AMQConsumer { // if it's browse only, nothing to be acked final boolean removeReferences = !serverConsumer.isBrowseOnly() && !serverConsumer.getQueue().isNonDestructive(); - final List ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData())); + final List ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData(MessageId.class)), reference -> lastID.equals(reference.getProtocolData(MessageId.class))); if (!ackList.isEmpty() || !removeReferences || serverConsumer.getQueue().isTemporary()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java index 227b669fc7..f71ee913a1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java @@ -28,6 +28,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme boolean messageAcknowledgements = true; + boolean sync = false; + SimpleString mirrorSNF; String addressFilter; @@ -98,4 +100,12 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme return this; } + public boolean isSync() { + return sync; + } + + public AMQPMirrorBrokerConnectionElement setSync(boolean sync) { + this.sync = sync; + return this; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 669d76d94b..3bdb0ca706 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -2121,10 +2121,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { boolean queueCreation = getBooleanAttribute(e2,"queue-creation", true); boolean durable = getBooleanAttribute(e2, "durable", true); boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true); + boolean sync = getBooleanAttribute(e2, "sync", false); String addressFilter = getAttributeValue(e2, "address-filter"); AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement(); - amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter); + amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync); connectionElement = amqpMirrorConnectionElement; connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR); } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 55c38c820a..0abc464360 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -26,14 +26,14 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.impl.AbstractProtocolReference; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; -import org.apache.activemq.artemis.utils.collections.LinkedListImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; -public class PagedReferenceImpl extends LinkedListImpl.Node implements PagedReference, Runnable { +public class PagedReferenceImpl extends AbstractProtocolReference implements PagedReference, Runnable { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -75,8 +75,6 @@ public class PagedReferenceImpl extends LinkedListImpl.Node private boolean alreadyAcked; - private Object protocolData; - //0 is false, 1 is true, 2 not defined private static final byte IS_NOT_LARGE_MESSAGE = 0; private static final byte IS_LARGE_MESSAGE = 1; @@ -97,16 +95,6 @@ public class PagedReferenceImpl extends LinkedListImpl.Node private static final byte UNDEFINED_IS_DURABLE = -1; private byte durable = UNDEFINED_IS_DURABLE; - @Override - public Object getProtocolData() { - return protocolData; - } - - @Override - public void setProtocolData(Object protocolData) { - this.protocolData = protocolData; - } - @Override public Message getMessage() { return getPagedMessage().getMessage(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java index 91681ff078..ceb402809c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java @@ -177,8 +177,12 @@ public class OperationContextImpl implements OperationContext { } } else { if (storeOnly) { - assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true; - storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined)); + if (storeLined == stored && EXECUTORS_PENDING_UPDATER.get(this) == 0) { + executeNow = true; + } else { + assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true; + storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined)); + } } else { // ensure total ordering assert validateTasksAdd(storeLined, replicationLined, pageLined); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index 8f82a172ea..4c368be6b9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -224,4 +224,8 @@ public interface PostOffice extends ActiveMQComponent { default AddressManager getAddressManager() { return null; } + + default void preAcknowledge(final Transaction tx, final MessageReference ref, AckReason reason) { + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index b9c60f1fb3..3950a4dcaa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -246,6 +246,18 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return this; } + @Override + public void preAcknowledge(final Transaction tx, final MessageReference ref, AckReason reason) { + if (mirrorControllerSource != null && reason != AckReason.REPLACED) { // we don't send replacements on LVQ as they are replaced themselves on the target + try { + mirrorControllerSource.preAcknowledge(tx, ref, reason); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + } + + @Override public void postAcknowledge(MessageReference ref, AckReason reason) { if (mirrorControllerSource != null && reason != AckReason.REPLACED) { // we don't send replacements on LVQ as they are replaced themselves on the target @@ -1624,7 +1636,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (mirrorControllerSource != null && !context.isMirrorDisabled()) { // we check for isMirrorDisabled as to avoid recursive loop from there - mirrorControllerSource.sendMessage(message, context, refs); + mirrorControllerSource.sendMessage(tx, message, context); } @@ -1647,10 +1659,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - private static void processReferences(List refs, boolean direct) { - for (MessageReference ref : refs) { - ref.getQueue().addTail(ref, direct); - } + public static void processReferences(List refs, boolean direct) { + refs.forEach((ref) -> processReference(ref, direct)); + } + + public static void processReference(MessageReference ref, boolean direct) { + ref.getQueue().addTail(ref, direct); } private void processRouteToDurableQueues(final Message message, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index daf4f90db1..dfb0c17b50 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -74,13 +74,13 @@ public interface MessageReference { * To be used on holding protocol specific data during the delivery. * This will be only valid while the message is on the delivering queue at the consumer */ - Object getProtocolData(); + T getProtocolData(Class typeClass); /** * To be used on holding protocol specific data during the delivery. * This will be only valid while the message is on the delivering queue at the consumer */ - void setProtocolData(Object data); + void setProtocolData(Class typeClass, T data); MessageReference copy(Queue queue); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AbstractProtocolReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AbstractProtocolReference.java new file mode 100644 index 0000000000..35f3a07548 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AbstractProtocolReference.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl; + +import java.util.HashMap; + +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.utils.collections.LinkedListImpl; + +/** I need to store protocol specific data on the references. The same need exists in both PagedReference and MessageReferenceImpl. + * This class will serve the purpose to keep the specific protocol data for either reference. + * */ +public abstract class AbstractProtocolReference extends LinkedListImpl.Node implements MessageReference { + + private HashMap protocolDataMap; + + @Override + public T getProtocolData(Class classType) { + if (protocolDataMap == null) { + return null; + } else { + return (T)protocolDataMap.get(classType); + } + } + + @Override + public void setProtocolData(Class classType, T protocolData) { + if (protocolDataMap == null) { + protocolDataMap = new HashMap<>(); + } + protocolDataMap.put(classType, protocolData); + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java index c0b0e02747..10163bdacd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java @@ -89,13 +89,13 @@ public class GroupFirstMessageReference implements MessageReference { } @Override - public Object getProtocolData() { - return messageReference.getProtocolData(); + public T getProtocolData(Class typeClass) { + return messageReference.getProtocolData(typeClass); } @Override - public void setProtocolData(Object data) { - messageReference.setProtocolData(data); + public void setProtocolData(Class typeClass, T data) { + messageReference.setProtocolData(typeClass, data); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 262689aa53..6d39c40372 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -27,12 +27,11 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.transaction.Transaction; -import org.apache.activemq.artemis.utils.collections.LinkedListImpl; /** * Implementation of a MessageReference */ -public class MessageReferenceImpl extends LinkedListImpl.Node implements MessageReference, Runnable { +public class MessageReferenceImpl extends AbstractProtocolReference implements MessageReference, Runnable { private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID(); @@ -78,8 +77,6 @@ public class MessageReferenceImpl extends LinkedListImpl.Node onDelivery; @@ -138,15 +135,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node refs); + void sendMessage(Transaction tx, Message message, RoutingContext context); void postAcknowledge(MessageReference ref, AckReason reason) throws Exception; + void preAcknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception; String getRemoteMirrorId(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java index d6c332a6b5..813ef5a098 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java @@ -37,4 +37,8 @@ public class TransactionPropertyIndexes { public static final int EXPIRY_LOGGER = 9; public static final int CONSUMER_METRICS_OPERATION = 10; + + public static final int MIRROR_ACK_OPERATION = 11; + + public static final int MIRROR_SEND_OPERATION = 12; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index 13ee0ba73e..536dbf1b42 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.transaction.impl; import javax.transaction.xa.Xid; import java.util.ArrayList; -import java.util.Arrays; import java.util.Date; import java.util.LinkedList; import java.util.List; +import io.netty.util.collection.IntObjectHashMap; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; @@ -50,7 +50,7 @@ public class TransactionImpl implements Transaction { private static final int INITIAL_NUM_PROPERTIES = 11; - private Object[] properties = null; + private IntObjectHashMap properties = null; protected final StorageManager storageManager; @@ -72,22 +72,6 @@ public class TransactionImpl implements Transaction { private Object protocolData; - private void ensurePropertiesCapacity(int capacity) { - if (properties != null && properties.length >= capacity) { - return; - } - createOrEnlargeProperties(capacity); - } - - private void createOrEnlargeProperties(int capacity) { - if (properties == null) { - properties = new Object[Math.min(TransactionImpl.INITIAL_NUM_PROPERTIES, capacity)]; - } else { - assert properties.length < capacity; - properties = Arrays.copyOf(properties, capacity); - } - } - @Override public Object getProtocolData() { return protocolData; @@ -529,14 +513,17 @@ public class TransactionImpl implements Transaction { @Override public void putProperty(final int index, final Object property) { - ensurePropertiesCapacity(index + 1); - properties[index] = property; + if (properties == null) { + properties = new IntObjectHashMap(); + } + + properties.put(index, property); } @Override public Object getProperty(final int index) { - return properties == null ? null : (index < properties.length ? properties[index] : null); + return properties == null ? null : properties.get(index); } // Private diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index cc8c2cd538..9154d510fd 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2447,6 +2447,14 @@ + + + + If this is true, client blocking operations will be waiting a response from the mirror before the unblocking the operation. + This is false by default. + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index aa5088452e..581cc78ea8 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -709,6 +709,11 @@ public class ConfigurationImplTest extends ActiveMQTestBase { @Test public void testAMQPConnectionsConfiguration() throws Throwable { + testAMQPConnectionsConfiguration(true); + testAMQPConnectionsConfiguration(false); + } + + private void testAMQPConnectionsConfiguration(boolean sync) throws Throwable { ConfigurationImpl configuration = new ConfigurationImpl(); Properties insertionOrderedProperties = new ConfigurationImpl.InsertionOrderedProperties(); @@ -723,6 +728,9 @@ public class ConfigurationImplTest extends ActiveMQTestBase { insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.queueCreation", "true"); insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.queueRemoval", "true"); insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.addressFilter", "foo"); + if (sync) { + insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.sync", "true"); + } // else we just use the default that is false configuration.parsePrefixedProperties(insertionOrderedProperties, null); @@ -742,6 +750,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase { Assert.assertEquals(true, amqpMirrorBrokerConnectionElement.isMessageAcknowledgements()); Assert.assertEquals(true, amqpMirrorBrokerConnectionElement.isQueueCreation()); Assert.assertEquals(true, amqpMirrorBrokerConnectionElement.isQueueRemoval()); + Assert.assertEquals(sync, ((AMQPMirrorBrokerConnectionElement) amqpBrokerConnectionElement).isSync()); Assert.assertEquals("foo", amqpMirrorBrokerConnectionElement.getAddressFilter()); } diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 8fec6467f4..8adcc93cf2 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -442,7 +442,7 @@ - + diff --git a/docs/user-manual/en/amqp-broker-connections.md b/docs/user-manual/en/amqp-broker-connections.md index 667ab8ceff..68784e7eee 100644 --- a/docs/user-manual/en/amqp-broker-connections.md +++ b/docs/user-manual/en/amqp-broker-connections.md @@ -81,8 +81,7 @@ The previous example portrays a case of connection failure towards ServerA. The
## Mirroring -The idea of mirroring is to send events that happen on a broker towards another broker, without blocking any operations from producers and consumers, allowing them to keep operating as fast as possible. -It can be used for Disaster Recovery, and depending on the requirements even for failing over the data. +Mirroring will reproduce any operation that happened on the source brokers towards a target broker. The following events are sent through mirroring: @@ -94,6 +93,8 @@ The following events are sent through mirroring: * Queue and address creation. * Queue and address deletion. +By default every operation is sent asynchronously without blocking any clients. However if you set sync="true" on the mirror configuration, the clients will always wait a mirror on every blocking operation. + ### Mirror configuration Add a `` element within the `` element to configure mirroring to the target broker. @@ -119,9 +120,10 @@ The following optional arguments can be utilized: matches all addresses starting with 'eu' but not those starting with 'eu.uk' **Note:** - - Address exclusion will always take precedence over address inclusion. - Address matching on mirror elements is prefix-based and does not support wild-card matching. +* `sync`: By default is false. If set it to true any client blocking operation will be held until the mirror as confirmed receiving the operation. + * Notice that a disconnected node would hold all operations from the client. If you set sync=true you must reconnect a mirror before performing any operations. An example of a mirror configuration is shown below: ```xml diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java index 24ddf08d71..7c88310fbf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java @@ -77,7 +77,7 @@ public class AmqpReferenceDeliveryAnnotationTest extends AmqpClientTestSupport { Map symbolObjectMap = new HashMap<>(); DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(symbolObjectMap); symbolObjectMap.put(Symbol.getSymbol("KEY"), uuid); - reference.setProtocolData(deliveryAnnotations); + reference.setProtocolData(DeliveryAnnotations.class, deliveryAnnotations); } }); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java index c717bdaea5..841c8ec1a6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java @@ -681,9 +681,11 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { server_2.getConfiguration().setName("thisone"); AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); - AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(acks); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(acks).setDurable(true); + replica.setName("theReplica"); amqpConnection.addElement(replica); server_2.getConfiguration().addAMQPConnection(amqpConnection); + server_2.getConfiguration().setName("server_2"); int NUMBER_OF_MESSAGES = 200; @@ -698,7 +700,6 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); if (!deferredStart) { Queue queueOnServer1 = locateQueue(server, getQueueName()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java new file mode 100644 index 0000000000..5b399cfe33 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.journal.IOCompletion; +import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.journal.JournalUpdateCallback; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; +import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; +import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQPSyncMirrorTest extends AmqpClientTestSupport { + + Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class); + + private static final String SLOW_SERVER_NAME = "slow"; + private static final int SLOW_SERVER_PORT = AMQP_PORT + 1; + + private ActiveMQServer slowServer; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + @Test + public void testPersistedSendAMQP() throws Exception { + testPersistedSend("AMQP", false, 100); + } + + @Test + public void testPersistedSendAMQPLarge() throws Exception { + testPersistedSend("AMQP", false, 200 * 1024); + } + + + @Test + public void testPersistedSendCore() throws Exception { + testPersistedSend("CORE", false, 100); + } + + @Test + public void testPersistedSendCoreLarge() throws Exception { + testPersistedSend("CORE", false, 200 * 1024); + } + + @Test + public void testPersistedSendAMQPTXLarge() throws Exception { + testPersistedSend("AMQP", true, 200 * 1024); + } + + @Test + public void testPersistedSendAMQPTX() throws Exception { + testPersistedSend("AMQP", true, 100); + } + + @Test + public void testPersistedSendCoreTX() throws Exception { + testPersistedSend("CORE", true, 100); + } + + @Test + public void testPersistedSendCoreTXLarge() throws Exception { + testPersistedSend("CORE", true, 200 * 1024); + } + + private void testPersistedSend(String protocol, boolean transactional, int messageSize) throws Exception { + ReusableLatch sendPending = new ReusableLatch(0); + Semaphore semSend = new Semaphore(1); + Semaphore semAck = new Semaphore(1); + AtomicInteger errors = new AtomicInteger(0); + + try { + final int NUMBER_OF_MESSAGES = 10; + + AtomicInteger countStored = new AtomicInteger(0); + + slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT, SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> { + if (logger.isDebugEnabled()) { + logger.debug("StorageCallback::slow isUpdate={}, isTX={}, txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, record); + } + if (transactional) { + if (isTX) { + try { + if (countStored.get() > 0) { + countStored.incrementAndGet(); + logger.debug("semSend.tryAcquire"); + if (semSend.tryAcquire(20, TimeUnit.SECONDS)) { + logger.debug("acquired TX, now release"); + semSend.release(); + } + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + } + if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) { + logger.debug("slow ACK REF"); + try { + if (semAck.tryAcquire(20, TimeUnit.SECONDS)) { + semAck.release(); + logger.debug("slow acquired ACK semaphore"); + } else { + logger.debug("Semaphore wasn't acquired"); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) { + try { + countStored.incrementAndGet(); + if (!transactional) { + logger.debug("semSend.tryAcquire"); + if (semSend.tryAcquire(20, TimeUnit.SECONDS)) { + logger.debug("acquired non TX now release"); + semSend.release(); + } + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } + } + }); + slowServer.setIdentity("slowServer"); + server.setIdentity("server"); + + ExecutorService pool = Executors.newFixedThreadPool(5); + runAfter(pool::shutdown); + + configureMirrorTowardsSlow(server); + + slowServer.getConfiguration().setName("slow"); + server.getConfiguration().setName("fast"); + slowServer.start(); + server.start(); + + waitForServerToStart(slowServer); + waitForServerToStart(server); + + server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false)); + + Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null); + Queue replicatedQueue = slowServer.locateQueue(getQueueName()); + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + AMQP_PORT); + + if (factory instanceof ActiveMQConnectionFactory) { + ((ActiveMQConnectionFactory) factory).getServerLocator().setBlockOnAcknowledge(true); + } + + Connection connection = factory.createConnection(); + runAfter(connection::close); + Session session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + + connection.start(); + + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + final String bodyMessage; + { + StringBuffer buffer = new StringBuffer(); + for (int i = 0; i < messageSize; i++) { + buffer.append("large Buffer..."); + } + bodyMessage = buffer.toString(); + } + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + logger.debug("===>>> send message {}", i); + int theI = i; + sendPending.countUp(); + logger.debug("semSend.acquire"); + semSend.acquire(); + if (!transactional) { + pool.execute(() -> { + try { + logger.debug("Entering non TX send with sendPending = {}", sendPending.getCount()); + TextMessage message = session.createTextMessage(bodyMessage); + message.setStringProperty("strProperty", "" + theI); + producer.send(message); + sendPending.countDown(); + logger.debug("leaving non TX send with sendPending = {}", sendPending.getCount()); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } + }); + } else { + CountDownLatch sendDone = new CountDownLatch(1); + pool.execute(() -> { + try { + TextMessage message = session.createTextMessage(bodyMessage); + message.setStringProperty("strProperty", "" + theI); + producer.send(message); + } catch (Throwable e) { + errors.incrementAndGet(); + logger.warn(e.getMessage(), e); + } + sendDone.countDown(); + }); + + Wait.assertEquals(i, replicatedQueue::getMessageCount); + + Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS)); + + pool.execute(() -> { + try { + session.commit(); + sendPending.countDown(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + }); + } + + Assert.assertFalse("sendPending.await() not supposed to succeed", sendPending.await(10, TimeUnit.MILLISECONDS)); + logger.debug("semSend.release"); + semSend.release(); + Assert.assertTrue(sendPending.await(10, TimeUnit.SECONDS)); + Wait.assertEquals(i + 1, replicatedQueue::getMessageCount); + } + + if (!transactional) { + Wait.assertEquals(NUMBER_OF_MESSAGES, countStored::get); + } + Wait.assertEquals(NUMBER_OF_MESSAGES, replicatedQueue::getMessageCount); + + connection.start(); + + Session clientSession = transactional ? connection.createSession(true, Session.AUTO_ACKNOWLEDGE) : connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = clientSession.createConsumer(clientSession.createQueue(getQueueName())); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + logger.debug("===<<< Receiving message {}", i); + Message message = consumer.receive(5000); + Assert.assertNotNull(message); + semAck.acquire(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + pool.execute(() -> { + try { + if (transactional) { + clientSession.commit(); + } else { + message.acknowledge(); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + countDownLatch.countDown(); + } + }); + + if (!transactional && protocol.equals("AMQP")) { + // non transactional ack in AMQP is always async. No need to verify anything else here + logger.debug("non transactional and amqp is always asynchronous. No need to verify anything"); + } else { + Assert.assertFalse(countDownLatch.await(10, TimeUnit.MILLISECONDS)); + } + + semAck.release(); + Assert.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + Wait.assertEquals(NUMBER_OF_MESSAGES - i - 1, replicatedQueue::getMessageCount); + } + + Assert.assertEquals(0, errors.get()); + } finally { + semAck.release(); + semSend.release(); + } + } + + @Override + protected ActiveMQServer createServer() throws Exception { + ActiveMQServer server = createServerWithCallbackStorage(AMQP_PORT, "fastServer", (isUpdate, isTX, txId, id, recordType, persister, record) -> { + if (logger.isDebugEnabled()) { + logger.debug("StorageCallback::fast isUpdate={}, isTX={}, txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, record); + } + }); + addServer(server); + return server; + } + + private void configureMirrorTowardsSlow(ActiveMQServer source) { + AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:" + SLOW_SERVER_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(true).setMessageAcknowledgements(true); + connection.addElement(replication); + + source.getConfiguration().addAMQPConnection(connection); + } + + private ActiveMQServer createServerWithCallbackStorage(int port, String name, StorageCallback storageCallback) throws Exception { + ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration()); + ActiveMQServer server = new ActiveMQServerImpl(createBasicConfig(port), mBeanServer, securityManager) { + @Override + protected StorageManager createStorageManager() { + return AMQPSyncMirrorTest.this.createCallbackStorageManager(getConfiguration(), getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener, storageCallback); + } + }; + + server.getConfiguration().setName(name); + server.getConfiguration().getAcceptorConfigurations().clear(); + server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(slowServer, port)); + server.getConfiguration().setMessageExpiryScanPeriod(-1); + + server.getConfiguration().setJMXManagementEnabled(true); + + configureAddressPolicy(server); + configureBrokerSecurity(server); + + addServer(server); + + return server; + } + + private interface StorageCallback { + void storage(boolean isUpdate, + boolean isCommit, + long txID, + long id, + byte recordType, + Persister persister, + Object record); + } + + private StorageManager createCallbackStorageManager(Configuration configuration, + CriticalAnalyzer criticalAnalyzer, + ExecutorFactory executorFactory, + ScheduledExecutorService scheduledPool, + ExecutorFactory ioExecutorFactory, + IOCriticalErrorListener ioCriticalErrorListener, + StorageCallback storageCallback) { + return new JournalStorageManager(configuration, criticalAnalyzer, executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) { + @Override + protected Journal createMessageJournal(Configuration config, + IOCriticalErrorListener criticalErrorListener, + int fileSize) { + return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()) { + @Override + public void appendAddRecordTransactional(long txID, + long id, + byte recordType, + Persister persister, + Object record) throws Exception { + storageCallback.storage(false, false, txID, id, recordType, persister, record); + super.appendAddRecordTransactional(txID, id, recordType, persister, record); + } + + @Override + public void appendAddRecord(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + IOCompletion callback) throws Exception { + storageCallback.storage(false, false, -1, id, recordType, persister, record); + super.appendAddRecord(id, recordType, persister, record, sync, callback); + } + + @Override + public void appendUpdateRecord(long id, + byte recordType, + EncodingSupport record, + boolean sync) throws Exception { + storageCallback.storage(true, false, -1, id, recordType, null, record); + super.appendUpdateRecord(id, recordType, record, sync); + } + + @Override + public void appendUpdateRecordTransactional(long txID, + long id, + byte recordType, + EncodingSupport record) throws Exception { + storageCallback.storage(true, false, txID, id, recordType, null, record); + super.appendUpdateRecordTransactional(txID, id, recordType, record); + } + + @Override + public void appendCommitRecord(long txID, + boolean sync, + IOCompletion callback, + boolean lineUpContext) throws Exception { + storageCallback.storage(false, true, txID, txID, (byte)0, null, null); + super.appendCommitRecord(txID, sync, callback, lineUpContext); + } + + @Override + public void tryAppendUpdateRecord(long id, + byte recordType, + Persister persister, + Object record, + boolean sync, + boolean replaceableUpdate, + JournalUpdateCallback updateCallback, + IOCompletion callback) throws Exception { + storageCallback.storage(true, false, -1, -1, recordType, persister, record); + super.tryAppendUpdateRecord(id, recordType, persister, record, sync, replaceableUpdate, updateCallback, callback); + } + }; + } + }; + } +} \ No newline at end of file diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java index 0a690c2a78..8b332fdf02 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java @@ -94,6 +94,7 @@ public class ConfigurationValidationTest extends ActiveMQTestBase { Assert.assertFalse(mirrorConnectionElement.isQueueCreation()); Assert.assertFalse(mirrorConnectionElement.isQueueRemoval()); Assert.assertFalse(mirrorConnectionElement.isDurable()); + Assert.assertTrue(mirrorConnectionElement.isSync()); amqpBrokerConnectConfiguration = fc.getAMQPConnection().get(1); @@ -104,6 +105,7 @@ public class ConfigurationValidationTest extends ActiveMQTestBase { Assert.assertFalse(mirrorConnectionElement.isDurable()); Assert.assertTrue(mirrorConnectionElement.isQueueCreation()); Assert.assertTrue(mirrorConnectionElement.isQueueRemoval()); + Assert.assertFalse(mirrorConnectionElement.isSync()); // checking the default amqpBrokerConnectConfiguration = fc.getAMQPConnection().get(2); Assert.assertFalse(amqpBrokerConnectConfiguration.isAutostart());