ARTEMIS-4366 Some Adjustment to class names on Mirror
This is in relation to comments from PR #4555
This commit is contained in:
parent
24dde9d4b5
commit
29deb30c73
|
@ -33,7 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||||
import org.apache.activemq.artemis.core.server.management.NotificationListener;
|
import org.apache.activemq.artemis.core.server.management.NotificationListener;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
|
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStoreFactory;
|
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
|
||||||
|
@ -73,9 +73,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
||||||
|
|
||||||
private final ActiveMQServer server;
|
private final ActiveMQServer server;
|
||||||
|
|
||||||
// We must use one referenceIDSupplier per server.
|
private ReferenceIDSupplier referenceIDSupplier;
|
||||||
// protocol manager is the perfect aggregation for that.
|
|
||||||
private ReferenceNodeStoreFactory referenceIDSupplier;
|
|
||||||
|
|
||||||
private final ProtonProtocolManagerFactory factory;
|
private final ProtonProtocolManagerFactory factory;
|
||||||
|
|
||||||
|
@ -125,11 +123,11 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
||||||
routingHandler = new AMQPRoutingHandler(server);
|
routingHandler = new AMQPRoutingHandler(server);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized ReferenceNodeStoreFactory getReferenceIDSupplier() {
|
public synchronized ReferenceIDSupplier getReferenceIDSupplier() {
|
||||||
if (referenceIDSupplier == null) {
|
if (referenceIDSupplier == null) {
|
||||||
// we lazy start the instance.
|
// we lazy start the instance.
|
||||||
// only create it when needed
|
// only create it when needed
|
||||||
referenceIDSupplier = new ReferenceNodeStoreFactory(server);
|
referenceIDSupplier = new ReferenceIDSupplier(server);
|
||||||
}
|
}
|
||||||
return referenceIDSupplier;
|
return referenceIDSupplier;
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
|
||||||
|
|
||||||
final Queue snfQueue;
|
final Queue snfQueue;
|
||||||
final ActiveMQServer server;
|
final ActiveMQServer server;
|
||||||
final ReferenceNodeStoreFactory idSupplier;
|
final ReferenceIDSupplier idSupplier;
|
||||||
final boolean acks;
|
final boolean acks;
|
||||||
final boolean addQueues;
|
final boolean addQueues;
|
||||||
final boolean deleteQueues;
|
final boolean deleteQueues;
|
||||||
|
@ -324,14 +324,14 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void validateProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
|
public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
|
||||||
if (ref.getProtocolData(DeliveryAnnotations.class) == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
|
if (ref.getProtocolData(DeliveryAnnotations.class) == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
|
||||||
setProtocolData(referenceIDSupplier, ref);
|
setProtocolData(referenceIDSupplier, ref);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** This method will return the brokerID used by the message */
|
/** This method will return the brokerID used by the message */
|
||||||
private static String setProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref) {
|
private static String setProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref) {
|
||||||
String brokerID = referenceIDSupplier.getServerID(ref);
|
String brokerID = referenceIDSupplier.getServerID(ref);
|
||||||
long id = referenceIDSupplier.getID(ref);
|
long id = referenceIDSupplier.getID(ref);
|
||||||
|
|
||||||
|
|
|
@ -162,7 +162,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
|
||||||
DuplicateIDCache lruduplicateIDCache;
|
DuplicateIDCache lruduplicateIDCache;
|
||||||
String lruDuplicateIDKey;
|
String lruDuplicateIDKey;
|
||||||
|
|
||||||
private final ReferenceNodeStoreFactory referenceNodeStore;
|
private final ReferenceIDSupplier referenceNodeStore;
|
||||||
|
|
||||||
OperationContext mirrorContext;
|
OperationContext mirrorContext;
|
||||||
|
|
||||||
|
@ -374,7 +374,6 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
|
||||||
|
|
||||||
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
|
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
|
||||||
|
|
||||||
|
|
||||||
if (reference == null) {
|
if (reference == null) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Retrying Reference not found on messageID={}, nodeID={}, queue={}. currentRetry={}", messageID, nodeID, targetQueue, retry);
|
logger.debug("Retrying Reference not found on messageID={}, nodeID={}, queue={}. currentRetry={}", messageID, nodeID, targetQueue, retry);
|
||||||
|
|
|
@ -26,18 +26,23 @@ import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
|
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
|
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
|
||||||
|
|
||||||
public class ReferenceNodeStoreFactory implements NodeStoreFactory<MessageReference> {
|
/**
|
||||||
|
* Since Artemis 2.30.0 this is supplying a new NodeStore per queue.
|
||||||
|
* It is also parsing MessageReference and Message for the proper ID for the messages.
|
||||||
|
* @since 2.30.0
|
||||||
|
*/
|
||||||
|
public class ReferenceIDSupplier implements NodeStoreFactory<MessageReference> {
|
||||||
|
|
||||||
final ActiveMQServer server;
|
final ActiveMQServer server;
|
||||||
|
|
||||||
private final String serverID;
|
private final String serverID;
|
||||||
|
|
||||||
public ReferenceNodeStoreFactory(ActiveMQServer server) {
|
public ReferenceIDSupplier(ActiveMQServer server) {
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.serverID = server.getNodeID().toString();
|
this.serverID = server.getNodeID().toString();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** This will return the NodeStore that will be used by the Queue. */
|
||||||
@Override
|
@Override
|
||||||
public NodeStore<MessageReference> newNodeStore() {
|
public NodeStore<MessageReference> newNodeStore() {
|
||||||
return new ReferenceNodeStore(this);
|
return new ReferenceNodeStore(this);
|
||||||
|
@ -51,7 +56,6 @@ public class ReferenceNodeStoreFactory implements NodeStoreFactory<MessageRefere
|
||||||
return getServerID(element.getMessage());
|
return getServerID(element.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public String getServerID(Message message) {
|
public String getServerID(Message message) {
|
||||||
Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
|
Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
|
||||||
if (nodeID != null) {
|
if (nodeID != null) {
|
||||||
|
@ -77,6 +81,4 @@ public class ReferenceNodeStoreFactory implements NodeStoreFactory<MessageRefere
|
||||||
private Long getID(Message message) {
|
private Long getID(Message message) {
|
||||||
return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
|
return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -26,10 +26,10 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
|
||||||
|
|
||||||
public class ReferenceNodeStore implements NodeStore<MessageReference> {
|
public class ReferenceNodeStore implements NodeStore<MessageReference> {
|
||||||
|
|
||||||
private final ReferenceNodeStoreFactory factory;
|
private final ReferenceIDSupplier idSupplier;
|
||||||
|
|
||||||
public ReferenceNodeStore(ReferenceNodeStoreFactory factory) {
|
public ReferenceNodeStore(ReferenceIDSupplier idSupplier) {
|
||||||
this.factory = factory;
|
this.idSupplier = idSupplier;
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is where the messages are stored by server id...
|
// This is where the messages are stored by server id...
|
||||||
|
@ -67,7 +67,6 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LinkedListImpl.Node<MessageReference> getNode(String serverID, long id) {
|
public LinkedListImpl.Node<MessageReference> getNode(String serverID, long id) {
|
||||||
LongObjectHashMap<LinkedListImpl.Node<MessageReference>> nodeMap = getMap(serverID);
|
LongObjectHashMap<LinkedListImpl.Node<MessageReference>> nodeMap = getMap(serverID);
|
||||||
|
@ -82,7 +81,7 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
|
||||||
/** notice getMap should always return an instance. It should never return null. */
|
/** notice getMap should always return an instance. It should never return null. */
|
||||||
private synchronized LongObjectHashMap<LinkedListImpl.Node<MessageReference>> getMap(String serverID) {
|
private synchronized LongObjectHashMap<LinkedListImpl.Node<MessageReference>> getMap(String serverID) {
|
||||||
if (serverID == null) {
|
if (serverID == null) {
|
||||||
serverID = factory.getDefaultNodeID();
|
serverID = idSupplier.getDefaultNodeID();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lruListID != null && lruListID.equals(serverID)) {
|
if (lruListID != null && lruListID.equals(serverID)) {
|
||||||
|
@ -105,15 +104,15 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getServerID(MessageReference element) {
|
public String getServerID(MessageReference element) {
|
||||||
return factory.getServerID(element);
|
return idSupplier.getServerID(element);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getServerID(Message message) {
|
public String getServerID(Message message) {
|
||||||
return factory.getServerID(message);
|
return idSupplier.getServerID(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getID(MessageReference element) {
|
public long getID(MessageReference element) {
|
||||||
return factory.getID(element);
|
return idSupplier.getID(element);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -132,5 +131,4 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
|
||||||
}
|
}
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue