diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index d9052c4623..7c7a63f18d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.MirrorOption; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.AckReason; @@ -76,6 +77,12 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id"); public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst"); + /** When a clustered node (from regular cluster connections) receives a message + it will have target queues associated with it + this could be from message redistribution or simply load balancing. + an that case this will have the queue associated with it */ + public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-trg-q"); + // Capabilities public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror"); public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint"); @@ -83,7 +90,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(INTERNAL_ID.toString()); public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(BROKER_ID.toString()); - private static final ThreadLocal mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorDisabled(true)); + private static final ThreadLocal mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorOption(MirrorOption.disabled)); final Queue snfQueue; final ActiveMQServer server; @@ -228,13 +235,13 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im public void sendMessage(Transaction tx, Message message, RoutingContext context) { SimpleString address = context.getAddress(message); - if (invalidTarget(context.getMirrorSource())) { - logger.trace("sendMessage::server {} is discarding send to avoid infinite loop (reflection with the mirror)", server); + if (context.isInternal()) { + logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server); return; } - if (context.isInternal()) { - logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server); + if (invalidTarget(context.getMirrorSource())) { + logger.trace("sendMessage::server {} is discarding send to avoid infinite loop (reflection with the mirror)", server); return; } @@ -256,7 +263,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im } MessageReference ref = MessageReference.Factory.createReference(message, snfQueue); - setProtocolData(ref, nodeID, idSupplier.getID(ref)); + setProtocolData(ref, nodeID, idSupplier.getID(ref), context); snfQueue.refUp(ref); @@ -330,12 +337,12 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im String brokerID = referenceIDSupplier.getServerID(ref); long id = referenceIDSupplier.getID(ref); - setProtocolData(ref, brokerID, id); + setProtocolData(ref, brokerID, id, null); return brokerID; } - private static void setProtocolData(MessageReference ref, String brokerID, long id) { + private static void setProtocolData(MessageReference ref, String brokerID, long id, RoutingContext routingContext) { Map daMap = new HashMap<>(); DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap); @@ -357,6 +364,13 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress()); } } + + if (routingContext != null && routingContext.isMirrorIndividualRoute()) { + ArrayList queues = new ArrayList<>(); + routingContext.forEachDurable(q -> queues.add(String.valueOf(q.getName()))); + daMap.put(TARGET_QUEUES, queues); + } + ref.setProtocolData(DeliveryAnnotations.class, deliveryAnnotations); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index 1140f5c8b4..55f35bf645 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.protocol.amqp.connect.mirror; +import java.util.Collection; import java.util.function.BooleanSupplier; import java.util.function.ToIntFunction; @@ -29,6 +30,8 @@ import org.apache.activemq.artemis.core.io.RunnableCallback; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; @@ -73,6 +76,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirro import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES; public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController { @@ -436,6 +440,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement Long internalIDLong = (Long) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID); String internalAddress = (String) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_DESTINATION); + Collection targetQueues = (Collection) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, TARGET_QUEUES); + long internalID = 0; if (internalIDLong != null) { @@ -479,8 +485,13 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement routingContext.setTransaction(transaction); duplicateIDCache.addToCache(duplicateIDBytes, transaction); - routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.OFF); - server.getPostOffice().route(message, routingContext, false); + routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.LOCAL_ONLY); + if (targetQueues != null) { + targetQueuesRouting(message, routingContext, targetQueues); + server.getPostOffice().processRoute(message, routingContext, false); + } else { + server.getPostOffice().route(message, routingContext, false); + } // We use this as part of a transaction because of the duplicate detection cache that needs to be done atomically transaction.commit(); flow(); @@ -489,6 +500,24 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement return true; } + /** When the source mirror receives messages from a cluster member of his own, it should then fill targetQueues so we could play the same semantic the source applied on its routing */ + private void targetQueuesRouting( final Message message, + final RoutingContext context, + final Collection queueNames) throws Exception { + Bindings bindings = server.getPostOffice().getBindingsForAddress(message.getAddressSimpleString()); + queueNames.forEach(name -> { + Binding binding = bindings.getBinding(name); + if (binding != null) { + try { + binding.route(message, context); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + }); + } + + @Override public void postAcknowledge(MessageReference ref, AckReason reason) { // Do nothing diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java index fa0eed6256..c73d6e170f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.postoffice; import java.util.Collection; +import java.util.function.BiConsumer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -32,6 +33,8 @@ public interface Bindings extends UnproposalListener { Collection getBindings(); + Binding getBinding(String name); + void addBinding(Binding binding); Binding removeBindingByUniqueName(SimpleString uniqueName); @@ -55,4 +58,8 @@ public interface Bindings extends UnproposalListener { void route(Message message, RoutingContext context) throws Exception; boolean allowRedistribute(); + + void forEach(BiConsumer bindingConsumer); + + int size(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index a7f279fbd8..16072dbf60 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.MirrorOption; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; @@ -51,6 +52,7 @@ import org.apache.activemq.artemis.utils.CompositeAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; +import java.util.function.BiConsumer; public final class BindingsImpl implements Bindings { @@ -119,6 +121,11 @@ public final class BindingsImpl implements Bindings { } } + @Override + public Binding getBinding(String name) { + return bindingsNameMap.get(SimpleString.toSimpleString(name)); + } + @Override public void addBinding(final Binding binding) { try { @@ -183,6 +190,17 @@ public final class BindingsImpl implements Bindings { return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION); } + @Override + public void forEach(BiConsumer bindingConsumer) { + bindingsNameMap.forEach(bindingConsumer); + } + + @Override + public int size() { + return bindingsNameMap.size(); + } + + @Override public Message redistribute(final Message message, final Queue originatingQueue, @@ -439,6 +457,10 @@ public final class BindingsImpl implements Bindings { } } + if (loadBalancingType.equals(MessageLoadBalancingType.LOCAL_ONLY) && binding instanceof RemoteQueueBinding) { + return false; + } + final Filter filter = binding.getFilter(); if (filter == null || filter.match(message)) { @@ -577,6 +599,9 @@ public final class BindingsImpl implements Bindings { private void routeFromCluster(final Message message, final RoutingContext context, final byte[] ids) throws Exception { + if (!context.isMirrorDisabled()) { + context.setMirrorOption(MirrorOption.individualRoute); + } byte[] idsToAck = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_ACK_IDS); List idsToAckList = new ArrayList<>(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java new file mode 100644 index 0000000000..0ac0181ece --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +/** This is to be used in conjunction with RoutingContext, where we control certain semantics during routing. + * */ +public enum MirrorOption { + enabled, + disabled, + individualRoute +} \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java index d95c7ae9d8..cd64a1f484 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; @@ -40,9 +41,15 @@ public interface RoutingContext { /** If the routing is from MirrorController, we don't redo mirrorController * to avoid*/ + MirrorOption getMirrorOption(); + + void forEachDurable(Consumer consumer); + + RoutingContext setMirrorOption(MirrorOption option); + boolean isMirrorDisabled(); - RoutingContext setMirrorDisabled(boolean mirrorDisabled); + boolean isMirrorIndividualRoute(); /** return true if every queue routed is internal */ boolean isInternal(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index b835c38ace..77a718ffee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -784,7 +784,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } else { // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never // actually routed to at that address though - queue = server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.MULTICAST).setAutoCreateAddress(true).setMaxConsumers(-1).setPurgeOnNoConsumers(false)); + queue = server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.MULTICAST).setAutoCreateAddress(true).setMaxConsumers(-1).setPurgeOnNoConsumers(false).setInternal(true)); } // There are a few things that will behave differently when it's an internal queue diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java index 59f2941912..e17faf0dc2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java @@ -17,5 +17,5 @@ package org.apache.activemq.artemis.core.server.cluster.impl; public enum MessageLoadBalancingType { - OFF, STRICT, ON_DEMAND, OFF_WITH_REDISTRIBUTION; + OFF, STRICT, ON_DEMAND, OFF_WITH_REDISTRIBUTION, LOCAL_ONLY; // notice that LOCAL_ONLY is an internal use only option. When Mirror sends a message to a target mirror, messages should be routed locally only and to not any other cluster. } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index cf06b78d4c..9259b8802d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -3986,7 +3986,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfiguration)); } - if (mirrorControllerService != null) { + if (mirrorControllerService != null && !queueConfiguration.isInternal()) { mirrorControllerService.createQueue(queueConfiguration); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index f7ec7bf718..0c893eee6c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -82,6 +82,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.MirrorOption; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -707,6 +708,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.server = server; + if (queueConfiguration.isInternal()) { + this.internalQueue = queueConfiguration.isInternal(); + } + scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this); if (addressSettingsRepository != null) { @@ -3540,7 +3545,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // we Disable mirror on expiration as the target might be also expiring it // and this could cause races // we will only send the ACK for the expiration with the reason=EXPIRE and the expire will be played on the mirror side - context.setMirrorDisabled(true); + context.setMirrorOption(MirrorOption.disabled); } routingStatus = postOffice.route(copyMessage, context, false, rejectDuplicate, binding); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java index 1220190d12..e5dc83d27b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java @@ -22,9 +22,11 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.MirrorOption; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -36,7 +38,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction; public class RoutingContextImpl implements RoutingContext { - // The pair here is Durable and NonDurable private final Map map = new HashMap<>(); private Transaction transaction; @@ -63,7 +64,7 @@ public class RoutingContextImpl implements RoutingContext { volatile int version; - boolean mirrorDisabled = false; + MirrorOption mirrorOption = MirrorOption.enabled; private boolean duplicateDetection = true; @@ -85,13 +86,30 @@ public class RoutingContextImpl implements RoutingContext { } @Override - public boolean isMirrorDisabled() { - return mirrorDisabled; + public MirrorOption getMirrorOption() { + return mirrorOption; } @Override - public RoutingContextImpl setMirrorDisabled(boolean mirrorDisabled) { - this.mirrorDisabled = mirrorDisabled; + public boolean isMirrorDisabled() { + return mirrorOption == MirrorOption.disabled; + } + + @Override + public void forEachDurable(Consumer queueConsumer) { + map.forEach((a, b) -> { + b.getDurableQueues().forEach(queueConsumer); + }); + } + + @Override + public boolean isMirrorIndividualRoute() { + return mirrorOption == MirrorOption.individualRoute; + } + + @Override + public RoutingContextImpl setMirrorOption(MirrorOption mirrorOption) { + this.mirrorOption = mirrorOption; return this; } @@ -150,6 +168,10 @@ public class RoutingContextImpl implements RoutingContext { this.internalOnly = null; + if (mirrorOption == MirrorOption.individualRoute) { + mirrorOption = MirrorOption.enabled; + } + return this; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 1df10adde6..99bcee9746 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -725,6 +725,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener { AuditLogger.createQueue(this, remotingConnection.getSubject(), remotingConnection.getRemoteAddress(), queueConfiguration); } + if (queueConfiguration.getAddress().equals(server.getConfiguration().getManagementNotificationAddress())) { + queueConfiguration.setInternal(true); + } + queueConfiguration .setRoutingType(getRoutingTypeFromPrefix(queueConfiguration.getAddress(), queueConfiguration.getRoutingType())) .setAddress(removePrefix(queueConfiguration.getAddress())) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index a8a4488aaf..0de24670fc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -22,7 +22,6 @@ import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.MessageProducer; import javax.jms.Session; -import javax.management.MBeanServer; import java.net.URI; import java.util.Collections; import java.util.HashMap; @@ -41,7 +40,6 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; @@ -78,25 +76,8 @@ public class AmqpClientTestSupport extends AmqpTestSupport { protected static final Symbol SHARED = Symbol.getSymbol("shared"); protected static final Symbol GLOBAL = Symbol.getSymbol("global"); - protected static final String BROKER_NAME = "localhost"; - protected static final String NETTY_ACCEPTOR = "netty-acceptor"; - - protected String noprivUser = "noprivs"; - protected String noprivPass = "noprivs"; - - protected String browseUser = "browser"; - protected String browsePass = "browser"; - - protected String guestUser = "guest"; - protected String guestPass = "guest"; - - protected String fullUser = "user"; - protected String fullPass = "pass"; - protected ActiveMQServer server; - protected MBeanServer mBeanServer = createMBeanServer(); - @Before @Override public void setUp() throws Exception { @@ -170,68 +151,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport { return createServer(port, true); } - protected ActiveMQServer createServer(int port, boolean start) throws Exception { - - final ActiveMQServer server = this.createServer(true, true); - - server.getConfiguration().getAcceptorConfigurations().clear(); - server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(server, port)); - server.getConfiguration().setName(BROKER_NAME); - server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory() + port); - server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory() + port); - server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory() + port); - if (port == AMQP_PORT) { - // we use the default large directory if the default port - // as some tests will assert number of files - server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory()); - } else { - server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory() + port); - } - server.getConfiguration().setJMXManagementEnabled(true); - server.getConfiguration().setMessageExpiryScanPeriod(100); - server.setMBeanServer(mBeanServer); - - // Add any additional Acceptors needed for tests - addAdditionalAcceptors(server); - - // Address configuration - configureAddressPolicy(server); - - // Add optional security for tests that need it - configureBrokerSecurity(server); - - // Add extra configuration - addConfiguration(server); - - if (start) { - server.start(); - - // Prepare all addresses and queues for client tests. - createAddressAndQueues(server); - } - - return server; - } - - protected void addConfiguration(ActiveMQServer server) { - - } - - protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) { - HashMap params = new HashMap<>(); - params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port)); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, getConfiguredProtocols()); - HashMap amqpParams = new HashMap<>(); - configureAMQPAcceptorParameters(amqpParams); - TransportConfiguration tc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, NETTY_ACCEPTOR, amqpParams); - configureAMQPAcceptorParameters(tc); - return tc; - } - - protected String getConfiguredProtocols() { - return "AMQP,OPENWIRE"; - } - + @Override protected void configureAddressPolicy(ActiveMQServer server) { // Address configuration AddressSettings addressSettings = new AddressSettings(); @@ -252,6 +172,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport { } } + @Override protected void createAddressAndQueues(ActiveMQServer server) throws Exception { // Default Queue server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST)); @@ -272,10 +193,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport { } } - protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { - // None by default - } - + @Override protected void configureBrokerSecurity(ActiveMQServer server) { if (isSecurityEnabled()) { enableSecurity(server); @@ -313,14 +231,6 @@ public class AmqpClientTestSupport extends AmqpTestSupport { server.getConfiguration().setSecurityEnabled(true); } - protected void configureAMQPAcceptorParameters(Map params) { - // None by default - } - - protected void configureAMQPAcceptorParameters(TransportConfiguration tc) { - // None by default - } - public Queue getProxyToQueue(String queueName) { return server.locateQueue(SimpleString.toSimpleString(queueName)); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java index 3532caa7f4..d7605620e8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java @@ -16,11 +16,17 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import javax.management.MBeanServer; import java.net.URI; +import java.util.HashMap; import java.util.LinkedList; +import java.util.Map; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -35,6 +41,21 @@ import org.junit.After; */ public class AmqpTestSupport extends ActiveMQTestBase { + protected static final String BROKER_NAME = "localhost"; + protected static final String NETTY_ACCEPTOR = "netty-acceptor"; + + protected String noprivUser = "noprivs"; + protected String noprivPass = "noprivs"; + + protected String browseUser = "browser"; + protected String browsePass = "browser"; + + protected String guestUser = "guest"; + protected String guestPass = "guest"; + + protected String fullUser = "user"; + protected String fullPass = "pass"; + protected static final int AMQP_PORT = 5672; protected LinkedList connections = new LinkedList<>(); @@ -47,6 +68,8 @@ public class AmqpTestSupport extends ActiveMQTestBase { return connection; } + protected MBeanServer mBeanServer = createMBeanServer(); + @After @Override public void tearDown() throws Exception { @@ -148,4 +171,83 @@ public class AmqpTestSupport extends ActiveMQTestBase { } + protected ActiveMQServer createServer(int port, boolean start) throws Exception { + + final ActiveMQServer server = this.createServer(true, true); + + server.getConfiguration().getAcceptorConfigurations().clear(); + server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(server, port)); + server.getConfiguration().setName(BROKER_NAME); + server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory() + port); + server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory() + port); + server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory() + port); + if (port == AMQP_PORT) { + // we use the default large directory if the default port + // as some tests will assert number of files + server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory()); + } else { + server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory() + port); + } + server.getConfiguration().setJMXManagementEnabled(true); + server.getConfiguration().setMessageExpiryScanPeriod(100); + server.setMBeanServer(mBeanServer); + + // Add any additional Acceptors needed for tests + addAdditionalAcceptors(server); + + // Address configuration + configureAddressPolicy(server); + + // Add optional security for tests that need it + configureBrokerSecurity(server); + + // Add extra configuration + addConfiguration(server); + + if (start) { + server.start(); + + // Prepare all addresses and queues for client tests. + createAddressAndQueues(server); + } + + return server; + } + protected void createAddressAndQueues(ActiveMQServer server) throws Exception { + } + + protected void addConfiguration(ActiveMQServer server) { + } + + protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { + } + + protected void configureAddressPolicy(ActiveMQServer server) { + } + + protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) { + HashMap params = new HashMap<>(); + params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port)); + params.put(TransportConstants.PROTOCOLS_PROP_NAME, getConfiguredProtocols()); + HashMap amqpParams = new HashMap<>(); + configureAMQPAcceptorParameters(amqpParams); + TransportConfiguration tc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, NETTY_ACCEPTOR, amqpParams); + configureAMQPAcceptorParameters(tc); + return tc; + } + + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE"; + } + + + protected void configureAMQPAcceptorParameters(Map params) { + } + + protected void configureAMQPAcceptorParameters(TransportConfiguration tc) { + } + + protected void configureBrokerSecurity(ActiveMQServer server) { + } + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java new file mode 100644 index 0000000000..f52218910a --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MirrorOption; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl; +import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQPRedistributeClusterTest extends AmqpTestSupport { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String QUEUE_NAME = "REDIST_QUEUE"; + private static final String TOPIC_NAME = "REDIST_TOPIC"; + private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC"); + + protected static final int A_1_PORT = 5673; + protected static final int A_2_PORT = 5674; + + ActiveMQServer a1; + ActiveMQServer a2; + + protected static final int B_1_PORT = 5773; + protected static final int B_2_PORT = 5774; + + ActiveMQServer b1; + ActiveMQServer b2; + + @Before + public void setCluster() throws Exception { + a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT); + a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT); + + a1.start(); + a2.start(); + + b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1); + b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1); + + b1.start(); + b2.start(); + } + + private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception { + ActiveMQServer server = createServer(thisPort, false); + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST))); + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST)); + server.getConfiguration().clearAddressSettings(); + server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0)); + + + server.setIdentity(name); + server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort); + + ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode")); + server.getConfiguration().addClusterConfiguration(clusterConfiguration); + + if (mirrorPort > 0) { + server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort)))); + } + + return server; + } + + @Test + public void testQueueRedistributionAMQP() throws Exception { + internalQueueRedistribution("AMQP"); + } + + @Test + public void testQueueRedistributionCORE() throws Exception { + internalQueueRedistribution("CORE"); + } + + public void internalQueueRedistribution(String protocol) throws Exception { + AssertionLoggerHandler.startCapture(); + runAfter((AssertionLoggerHandler::stopCapture)); + + ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT); + ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT); + try (Connection conn = cfA1.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + for (int i = 0; i < 100; i++) { + producer.send(session.createTextMessage("Hello" + i)); + } + } + + try (Connection connA1 = cfA1.createConnection(); + Connection connA2 = cfA2.createConnection()) { + + connA1.start(); + connA2.start(); + + Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + for (int i = 0; i < 100; i++) { + MessageConsumer consumer; + String place; + if (i % 2 == 0) { + place = "A1"; + consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME)); + } else { + place = "A2"; + consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME)); + } + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + logger.debug("Received message {} from {}", message, place); + consumer.close(); + } + } + + assertEmptyQueue(a1.locateQueue(QUEUE_NAME)); + assertEmptyQueue(a2.locateQueue(QUEUE_NAME)); + assertEmptyQueue(b1.locateQueue(QUEUE_NAME)); + assertEmptyQueue(b2.locateQueue(QUEUE_NAME)); + + // if you see this message, most likely the notifications are being copied to the mirror + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196")); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037")); + } + + @Test + public void testTopicRedistributionAMQP() throws Exception { + internalTopicRedistribution("AMQP"); + } + + @Test + public void testTopicRedistributionCORE() throws Exception { + internalTopicRedistribution("CORE"); + } + + public void internalTopicRedistribution(String protocol) throws Exception { + + AssertionLoggerHandler.startCapture(); + runAfter((AssertionLoggerHandler::stopCapture)); + + final int numMessages = 100; + + String subscriptionName = "my-topic-shared-subscription"; + + ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT); + ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT); + + Topic topic; + + try (Connection conn = cfA1.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic(TOPIC_NAME); + MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName); + consumer.close(); + } + + try (Connection conn = cfA2.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic(TOPIC_NAME); + MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName); + consumer.close(); + } + + Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2); + Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2); + Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2); + Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2); + + // naming convention is different between the protocols, I'm navigating through the bindings to find the actual queue name + String subscriptionQueueName; + + { + HashSet subscriptionSet = new HashSet<>(); + // making sure the queues created on a1 are propaged into b1 + a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> { + logger.debug("{} = {}", n, b); + if (b instanceof LocalQueueBinding) { + QueueBinding qb = (QueueBinding) b; + subscriptionSet.add(qb.getUniqueName().toString()); + Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != null); + } + }); + Assert.assertEquals(1, subscriptionSet.size()); + subscriptionQueueName = subscriptionSet.iterator().next(); + } + + // making sure the queues created on a2 are propaged into b2 + a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> { + logger.debug("{} = {}", n, b); + if (b instanceof LocalQueueBinding) { + QueueBinding qb = (QueueBinding) b; + Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null); + } + }); + + Queue a1TopicSubscription = a1.locateQueue(subscriptionQueueName); + Assert.assertNotNull(a1TopicSubscription); + Queue a2TopicSubscription = a2.locateQueue(subscriptionQueueName); + Assert.assertNotNull(a2TopicSubscription); + Queue b1TopicSubscription = b1.locateQueue(subscriptionQueueName); + Assert.assertNotNull(b1TopicSubscription); + Queue b2TopicSubscription = b2.locateQueue(subscriptionQueueName); + Assert.assertNotNull(a2); + + + try (Connection conn = cfA1.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(topic); + for (int i = 0; i < numMessages; i++) { + producer.send(session.createTextMessage("Hello" + i)); + } + } + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196")); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037")); + + Assert.assertEquals(0, a1TopicSubscription.getConsumerCount()); + Wait.assertEquals(numMessages / 2, a1TopicSubscription::getMessageCount); + Wait.assertEquals(numMessages / 2, a2TopicSubscription::getMessageCount); + + logger.debug("b1={}. b2={}", b1TopicSubscription.getMessageCount(), b2TopicSubscription.getMessageCount()); + + Wait.assertEquals(numMessages / 2, b1TopicSubscription::getMessageCount); + Wait.assertEquals(numMessages / 2, b2TopicSubscription::getMessageCount); + + try (Connection connA1 = cfA1.createConnection(); + Connection connA2 = cfA2.createConnection()) { + + connA1.start(); + connA2.start(); + + Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + for (int i = 0; i < numMessages; i++) { + MessageConsumer consumer; + String place; + if (i % 2 == 0) { + place = "A1"; + consumer = sessionA1.createSharedDurableConsumer(topic, subscriptionName); + } else { + place = "A2"; + consumer = sessionA2.createSharedDurableConsumer(topic, subscriptionName); + } + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + logger.debug("Received message {} from {}", message, place); + consumer.close(); + } + } + + // if you see this message, most likely the notifications are being copied to the mirror + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196")); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037")); + + assertEmptyQueue(a1TopicSubscription); + assertEmptyQueue(a2TopicSubscription); + assertEmptyQueue(b1TopicSubscription); + assertEmptyQueue(b2TopicSubscription); + } + + // This test is playing with Remote binding routing, similarly to how topic redistribution would happen + @Test + public void testRemoteBindingRouting() throws Exception { + final String protocol = "AMQP"; + String subscriptionName = "my-topic-shared-subscription"; + + ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT); + ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT); + + Topic topic; + + try (Connection conn = cfA1.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic(TOPIC_NAME); + for (int i = 0; i < 10; i++) { + session.createSharedDurableConsumer(topic, subscriptionName + "_" + i); + } + } + + try (Connection conn = cfA2.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic(TOPIC_NAME); + for (int i = 0; i < 10; i++) { + session.createSharedDurableConsumer(topic, subscriptionName + "_" + i); + } + } + + Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20); + Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20); + Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20); + Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20); + + List remoteQueueBindings_a2 = new ArrayList<>(); + // making sure the queues created on a2 are propaged into b2 + a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, b) -> { + if (b instanceof RemoteQueueBindingImpl && b.getClusterName().toString().startsWith(subscriptionName + "_0")) { + logger.debug("{} = {}", a, b); + remoteQueueBindings_a2.add((RemoteQueueBinding) b); + } + }); + + Assert.assertEquals(1, remoteQueueBindings_a2.size()); + + RoutingContext routingContext = new RoutingContextImpl(new TransactionImpl(a2.getStorageManager())).setMirrorOption(MirrorOption.individualRoute); + + Message directMessage = new CoreMessage(a2.getStorageManager().generateID(), 512); + directMessage.setAddress(TOPIC_NAME); + directMessage.putStringProperty("Test", "t1"); + remoteQueueBindings_a2.get(0).route(directMessage, routingContext); + a2.getPostOffice().processRoute(directMessage, routingContext, false); + routingContext.getTransaction().commit(); + + for (int i = 0; i < 10; i++) { + String name = "my-topic-shared-subscription_" + i + ":global"; + Wait.assertEquals(i == 0 ? 1 : 0, a1.locateQueue(name)::getMessageCount); + logger.debug("a1 queue {} with {} messages", name, a1.locateQueue(name).getMessageCount()); + logger.debug("b1 queue {} with {} messages", name, b1.locateQueue(name).getMessageCount()); + logger.debug("a2 queue {} with {} messages", name, a2.locateQueue(name).getMessageCount()); + logger.debug("b2 queue {} with {} messages", name, b2.locateQueue(name).getMessageCount()); + Wait.assertEquals(i == 0 ? 1 : 0, b1.locateQueue(name)::getMessageCount); + Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount); + Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount); + } + } + + // This test has distinct subscriptions on each node and it is making sure the Mirror Routing is working accurately + @Test + public void testMultiNodeSubscription() throws Exception { + final String protocol = "AMQP"; + String subscriptionName = "my-topic-shared-subscription"; + + ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT); + ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT); + + Topic topic; + + try (Connection conn = cfA1.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic(TOPIC_NAME); + for (int i = 0; i < 10; i++) { + session.createSharedDurableConsumer(topic, subscriptionName + "_" + i); + } + } + + try (Connection conn = cfA2.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic(TOPIC_NAME); + for (int i = 10; i < 20; i++) { + session.createSharedDurableConsumer(topic, subscriptionName + "_" + i); + } + } + + Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20); + Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20); + Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20); + Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20); + + + try (Connection conn = cfA2.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic(TOPIC_NAME); + MessageProducer producer = session.createProducer(topic); + producer.send(session.createTextMessage("hello")); + } + + for (int i = 0; i < 10; i++) { + String name = "my-topic-shared-subscription_" + i + ":global"; + Wait.waitFor(() -> a1.locateQueue(name) != null); + Wait.waitFor(() -> b1.locateQueue(name) != null); + Wait.assertEquals(1, a1.locateQueue(name)::getMessageCount); + Wait.assertEquals(1, b1.locateQueue(name)::getMessageCount); + } + for (int i = 10; i < 20; i++) { + String name = "my-topic-shared-subscription_" + i + ":global"; + Wait.waitFor(() -> a2.locateQueue(name) != null); + Wait.waitFor(() -> b2.locateQueue(name) != null); + Wait.assertEquals(1, a2.locateQueue(name)::getMessageCount); + Wait.assertEquals(1, b2.locateQueue(name)::getMessageCount); + } + } + + private void assertEmptyQueue(Queue queue) { + Assert.assertNotNull(queue); + try { + Wait.assertEquals(0, queue::getMessageCount); + } catch (Throwable e) { + if (e instanceof AssertionError) { + throw (AssertionError) e; + } else { + throw new RuntimeException(e.getMessage(), e); + } + } + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,CORE"; + } +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java index 282c8176d0..33463ed3d1 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java @@ -46,6 +46,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; +import java.util.function.BiConsumer; /** * This test is replicating the behaviour from https://issues.jboss.org/browse/HORNETQ-988. @@ -464,6 +465,21 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { } + @Override + public Binding getBinding(String name) { + return null; + } + + @Override + public void forEach(BiConsumer bindingConsumer) { + bindings.forEach(bindingConsumer); + } + + @Override + public int size() { + return bindings.size(); + } + @Override public MessageLoadBalancingType getMessageLoadBalancingType() { return null;