diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 9355bbdff1..c64d287dff 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -33,7 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; -import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStoreFactory; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler; @@ -73,9 +73,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager im final Queue snfQueue; final ActiveMQServer server; - final ReferenceNodeStoreFactory idSupplier; + final ReferenceIDSupplier idSupplier; final boolean acks; final boolean addQueues; final boolean deleteQueues; @@ -324,14 +324,14 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im } } - public static void validateProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref, SimpleString snfAddress) { + public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref, SimpleString snfAddress) { if (ref.getProtocolData(DeliveryAnnotations.class) == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) { setProtocolData(referenceIDSupplier, ref); } } /** This method will return the brokerID used by the message */ - private static String setProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref) { + private static String setProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref) { String brokerID = referenceIDSupplier.getServerID(ref); long id = referenceIDSupplier.getID(ref); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index fa168005a0..47415cc049 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -162,7 +162,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement DuplicateIDCache lruduplicateIDCache; String lruDuplicateIDKey; - private final ReferenceNodeStoreFactory referenceNodeStore; + private final ReferenceIDSupplier referenceNodeStore; OperationContext mirrorContext; @@ -374,7 +374,6 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore); - if (reference == null) { if (logger.isDebugEnabled()) { logger.debug("Retrying Reference not found on messageID={}, nodeID={}, queue={}. currentRetry={}", messageID, nodeID, targetQueue, retry); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceIDSupplier.java similarity index 87% rename from artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java rename to artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceIDSupplier.java index 2782f85175..6ae967c710 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceIDSupplier.java @@ -26,18 +26,23 @@ import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; -public class ReferenceNodeStoreFactory implements NodeStoreFactory { +/** + * Since Artemis 2.30.0 this is supplying a new NodeStore per queue. + * It is also parsing MessageReference and Message for the proper ID for the messages. + * @since 2.30.0 + */ +public class ReferenceIDSupplier implements NodeStoreFactory { final ActiveMQServer server; private final String serverID; - public ReferenceNodeStoreFactory(ActiveMQServer server) { + public ReferenceIDSupplier(ActiveMQServer server) { this.server = server; this.serverID = server.getNodeID().toString(); - } + /** This will return the NodeStore that will be used by the Queue. */ @Override public NodeStore newNodeStore() { return new ReferenceNodeStore(this); @@ -51,7 +56,6 @@ public class ReferenceNodeStoreFactory implements NodeStoreFactory { - private final ReferenceNodeStoreFactory factory; + private final ReferenceIDSupplier idSupplier; - public ReferenceNodeStore(ReferenceNodeStoreFactory factory) { - this.factory = factory; + public ReferenceNodeStore(ReferenceIDSupplier idSupplier) { + this.idSupplier = idSupplier; } // This is where the messages are stored by server id... @@ -67,7 +67,6 @@ public class ReferenceNodeStore implements NodeStore { } } - @Override public LinkedListImpl.Node getNode(String serverID, long id) { LongObjectHashMap> nodeMap = getMap(serverID); @@ -82,7 +81,7 @@ public class ReferenceNodeStore implements NodeStore { /** notice getMap should always return an instance. It should never return null. */ private synchronized LongObjectHashMap> getMap(String serverID) { if (serverID == null) { - serverID = factory.getDefaultNodeID(); + serverID = idSupplier.getDefaultNodeID(); } if (lruListID != null && lruListID.equals(serverID)) { @@ -105,15 +104,15 @@ public class ReferenceNodeStore implements NodeStore { } public String getServerID(MessageReference element) { - return factory.getServerID(element); + return idSupplier.getServerID(element); } public String getServerID(Message message) { - return factory.getServerID(message); + return idSupplier.getServerID(message); } public long getID(MessageReference element) { - return factory.getID(element); + return idSupplier.getID(element); } @Override @@ -132,5 +131,4 @@ public class ReferenceNodeStore implements NodeStore { } return size; } - -} +} \ No newline at end of file