From 169d6317d9393a061daefb689bbc4054b27f68f9 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 31 Jan 2024 15:21:33 -0500 Subject: [PATCH] ARTEMIS-4626 Improve demand tracking for federation consumers Implement idempotent tracking for tracking demand at start while also tracking demand as consumer or bindings are added and removed. --- .../internal/FederationAddressEntry.java | 92 ++++ .../FederationAddressPolicyManager.java | 308 ++++++----- ...erEntry.java => FederationQueueEntry.java} | 48 +- .../FederationQueuePolicyManager.java | 49 +- .../AMQPFederationAddressPolicyTest.java | 511 +++++++++++++++++- .../AMQPFederationQueuePolicyTest.java | 87 +++ 6 files changed, 926 insertions(+), 169 deletions(-) create mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressEntry.java rename artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/{FederationConsumerEntry.java => FederationQueueEntry.java} (61%) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressEntry.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressEntry.java new file mode 100644 index 0000000000..6a8faf3310 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressEntry.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.federation.internal; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.activemq.artemis.core.postoffice.Binding; + +/** + * Am entry type class used to hold a {@link FederationConsumerInternal} and + * any other state data needed by the manager that is creating them based on the + * policy configuration for the federation instance. The entry can be extended + * by federation implementation to hold additional state data for the federation + * consumer and the managing of its lifetime. + * + * This entry type provides a reference counter that can be used to register demand + * on a federation resource such that it is not torn down until all demand has been + * removed from the local resource. + */ +public class FederationAddressEntry { + + private final FederationConsumerInternal consumer; + + private final Set demandBindings = new HashSet<>(); + + /** + * Creates a new address entry with a single reference + * + * @param consumer + * The federation consumer that will be carried in this entry. + */ + public FederationAddressEntry(FederationConsumerInternal consumer) { + this.consumer = consumer; + } + + /** + * @return the address that this entry is acting to federate. + */ + public String getAddress() { + return consumer.getConsumerInfo().getAddress(); + } + + /** + * @return the consumer managed by this entry + */ + public FederationConsumerInternal getConsumer() { + return consumer; + } + + /** + * @return true if there are bindings that are mapped to this federation entry. + */ + public boolean hasDemand() { + return !demandBindings.isEmpty(); + } + + /** + * Add demand on this federation address consumer from the given binding. + * + * @return this federation address consumer entry. + */ + public FederationAddressEntry addDemand(Binding binding) { + demandBindings.add(binding); + return this; + } + + /** + * Reduce demand on this federation address consumer from the given binding. + * + * @return this federation address consumer entry. + */ + public FederationAddressEntry removeDenamd(Binding binding) { + demandBindings.remove(binding); + return this; + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java index 01a8a4866d..d28b3b4938 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java @@ -61,10 +61,10 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected final ActiveMQServer server; - protected final FederationReceiveFromAddressPolicy policy; - protected final Map remoteConsumers = new HashMap<>(); protected final FederationInternal federation; - protected final Map> matchingDiverts = new HashMap<>(); + protected final FederationReceiveFromAddressPolicy policy; + protected final Map remoteConsumers = new HashMap<>(); + protected final Map> matchingDiverts = new HashMap<>(); private volatile boolean started; @@ -75,7 +75,6 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi this.federation = federation; this.policy = addressPolicy; this.server = federation.getServer(); - this.server.registerBrokerPlugin(this); } /** @@ -108,84 +107,72 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi } @Override - public synchronized void afterAddAddress(AddressInfo addressInfo, boolean reload) { - if (started && policy.isEnableDivertBindings() && policy.test(addressInfo)) { - try { - // A Divert can exist in configuration prior to the address having been auto created - // etc so upon address add this check needs to be run to capture addresses that now - // match the divert. - server.getPostOffice() - .getDirectBindings(addressInfo.getName()) - .stream().filter(binding -> binding instanceof DivertBinding) - .forEach(this::afterAddBinding); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.federationBindingsLookupError(addressInfo.getName(), e); - } - } - } - - @Override - public synchronized void afterAddBinding(Binding binding) { + public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException { if (started) { - checkBindingForMatch(binding); - } - } + if (binding instanceof QueueBinding) { + final FederationAddressEntry entry = remoteConsumers.get(binding.getAddress().toString()); - @Override - public synchronized void beforeRemoveBinding(SimpleString bindingName, Transaction tx, boolean deleteData) { - final Binding binding = server.getPostOffice().getBinding(bindingName); - final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress()); + if (entry != null) { + // This is QueueBinding that was mapped to a federated address so we can directly remove + // demand from the federation consumer and close it if demand is now gone. + tryRemoveDemandOnAddress(entry, binding); + } else if (policy.isEnableDivertBindings()) { + // See if there is any matching diverts that are forwarding to an address where this QueueBinding + // is bound and remove the mapping for any matches, diverts can have a composite set of address + // forwards so each divert must be checked in turn to see if it contains the address the removed + // binding was bound to. + matchingDiverts.entrySet().forEach(divertEntry -> { + final String sourceAddress = divertEntry.getKey().getAddress().toString(); + final SimpleString forwardAddress = divertEntry.getKey().getDivert().getForwardAddress(); - if (binding instanceof QueueBinding) { - tryRemoveDemandOnAddress(addressInfo); + if (isAddressInDivertForwards(binding.getAddress(), forwardAddress)) { + // Try and remove the queue binding from the set of registered bindings + // for the divert and if that removes all mapped bindings then we can + // remove the divert from the federated address entry and check if that + // removed all local demand which means we can close the consumer. + divertEntry.getValue().remove(binding); - if (policy.isEnableDivertBindings()) { - // See if there is any matching diverts that match this queue binding and remove demand now that - // the queue is going away. Since a divert can be composite we need to check for a match of the - // queue address on each of the forwards if there are any. - matchingDiverts.entrySet().forEach(entry -> { - final SimpleString forwardAddress = entry.getKey().getDivert().getForwardAddress(); - - if (isAddressInDivertForwards(binding.getAddress(), forwardAddress)) { - final AddressInfo srcAddressInfo = server.getPostOffice().getAddressInfo(entry.getKey().getAddress()); - - if (entry.getValue().remove(((QueueBinding) binding).getQueue().getName())) { - tryRemoveDemandOnAddress(srcAddressInfo); + if (divertEntry.getValue().isEmpty()) { + tryRemoveDemandOnAddress(remoteConsumers.get(sourceAddress), divertEntry.getKey()); + } } - } - }); - } - } else if (policy.isEnableDivertBindings() || binding instanceof DivertBinding) { - final DivertBinding divertBinding = (DivertBinding) binding; - final Set matchingQueues = matchingDiverts.remove(binding); + }); + } + } else if (policy.isEnableDivertBindings() && binding instanceof DivertBinding) { + final DivertBinding divert = (DivertBinding) binding; - // Each entry in the matching queues set is one instance of demand that was - // registered on the source address which would have been federated from the - // remote so on remove we deduct each and if that removes all demand the remote - // consumer will be closed. - if (matchingQueues != null) { - try { - matchingQueues.forEach((queueName) -> tryRemoveDemandOnAddress(addressInfo)); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.federationBindingsLookupError(divertBinding.getDivert().getForwardAddress(), e); + if (matchingDiverts.remove(divert) != null) { + // The divert binding is treated as one unit of demand on a federated address and + // when the divert is removed that unit of demand is removed regardless of existing + // bindings still remaining on the divert forwards. If the divert demand was the + // only thing keeping the federated address consumer open this will result in it + // being closed. + try { + tryRemoveDemandOnAddress(remoteConsumers.get(divert.getAddress().toString()), divert); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.federationBindingsLookupError(divert.getDivert().getForwardAddress(), e); + } } } } } - protected final void tryRemoveDemandOnAddress(AddressInfo addressInfo) { - final FederationConsumerInfo consumerInfo = createConsumerInfo(addressInfo); - final FederationConsumerEntry entry = remoteConsumers.get(consumerInfo); + protected final void tryRemoveDemandOnAddress(FederationAddressEntry entry, Binding binding) { + if (entry != null) { + entry.removeDenamd(binding); - if (entry != null && entry.reduceDemand()) { - final FederationConsumerInternal federationConsuner = entry.getConsumer(); + logger.trace("Reducing demand on federated address {}, remaining demand? {}", entry.getAddress(), entry.hasDemand()); - try { - signalBeforeCloseFederationConsumer(federationConsuner); - federationConsuner.close(); - signalAfterCloseFederationConsumer(federationConsuner); - } finally { - remoteConsumers.remove(consumerInfo); + if (!entry.hasDemand()) { + final FederationConsumerInternal federationConsuner = entry.getConsumer(); + + try { + signalBeforeCloseFederationConsumer(federationConsuner); + federationConsuner.close(); + signalAfterCloseFederationConsumer(federationConsuner); + } finally { + remoteConsumers.remove(entry.getAddress()); + } } } } @@ -199,17 +186,61 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi protected final void scanAllBindings() { server.getPostOffice() .getAllBindings() - .filter(bind -> bind instanceof QueueBinding || (policy.isEnableDivertBindings() && bind instanceof DivertBinding)) - .forEach(bind -> checkBindingForMatch(bind)); + .filter(binding -> binding instanceof QueueBinding || (policy.isEnableDivertBindings() && binding instanceof DivertBinding)) + .forEach(binding -> afterAddBinding(binding)); } + @Override + public synchronized void afterAddAddress(AddressInfo addressInfo, boolean reload) { + if (started && policy.isEnableDivertBindings() && policy.test(addressInfo)) { + try { + // A Divert can exist in configuration prior to the address having been auto created + // etc so upon address add this check needs to be run to capture addresses that now + // match the divert. + server.getPostOffice() + .getDirectBindings(addressInfo.getName()) + .stream().filter(binding -> binding instanceof DivertBinding) + .forEach(this::checkBindingForMatch); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.federationBindingsLookupError(addressInfo.getName(), e); + } + } + } + + @Override + public synchronized void afterAddBinding(Binding binding) { + if (started) { + checkBindingForMatch(binding); + } + } + + /** + * Called under lock this method should check if the given {@link Binding} matches the + * configured address federation policy and federate the address if so. The incoming + * {@link Binding} can be either a {@link QueueBinding} or a {@link DivertBinding} so + * the code should check both. + * + * @param binding + * The binding that should be checked against the federated address policy, + */ protected final void checkBindingForMatch(Binding binding) { if (binding instanceof QueueBinding) { final QueueBinding queueBinding = (QueueBinding) binding; final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress()); - reactIfBindingMatchesPolicy(addressInfo, queueBinding); - reactIfQueueBindingMatchesAnyDivertTarget(queueBinding); + if (testIfAddressMatchesPolicy(addressInfo)) { + // A plugin can block address federation for a given queue and if another queue + // binding does trigger address federation we don't want to track the rejected + // queue as demand so we always run this check before trying to create the address + // consumer. + if (isPluginBlockingFederationConsumerCreate(queueBinding.getQueue())) { + return; + } + + createOrUpdateFederatedAddressConsumerForBinding(addressInfo, queueBinding); + } else { + reactIfQueueBindingMatchesAnyDivertTarget(queueBinding); + } } else if (binding instanceof DivertBinding) { reactIfAnyQueueBindingMatchesDivertTarget((DivertBinding) binding); } @@ -229,7 +260,7 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi // We only need to check if we've never seen the divert before, afterwards we will // be checking it any time a new QueueBinding is added instead. if (matchingDiverts.get(divertBinding) == null) { - final Set matchingQueues = new HashSet<>(); + final Set matchingQueues = new HashSet<>(); matchingDiverts.put(divertBinding, matchingQueues); // We must account for the composite divert case by splitting the address and @@ -240,16 +271,25 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi try { for (SimpleString forward : forwardAddresses) { server.getPostOffice().getBindingsForAddress(forward).getBindings() - .stream().filter(b -> b instanceof QueueBinding) + .stream() + .filter(b -> b instanceof QueueBinding) .map(b -> (QueueBinding) b) .forEach(queueBinding -> { + // The plugin can block the demand totally here either based on the divert itself + // or the queue that's attached to the divert. if (isPluginBlockingFederationConsumerCreate(divertBinding.getDivert(), queueBinding.getQueue())) { return; } - if (reactIfBindingMatchesPolicy(addressInfo, queueBinding)) { - matchingQueues.add(queueBinding.getQueue().getName()); + // The plugin can block the demand selectively based on a single queue attached to + // the divert target(s). + if (isPluginBlockingFederationConsumerCreate(queueBinding.getQueue())) { + return; } + + matchingQueues.add(queueBinding); + + createOrUpdateFederatedAddressConsumerForBinding(addressInfo, divertBinding); }); } } catch (Exception e) { @@ -264,7 +304,6 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi } final SimpleString queueAddress = queueBinding.getAddress(); - final SimpleString queueName = queueBinding.getQueue().getName(); matchingDiverts.entrySet().forEach((e) -> { final SimpleString forwardAddress = e.getKey().getDivert().getForwardAddress(); @@ -274,28 +313,35 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi // addresses (composite diverts) of the Divert and if so then we can check if we need // to create demand on the source address on the remote if we haven't done so already. - if (!e.getValue().contains(queueName) && isAddressInDivertForwards(queueAddress, forwardAddress)) { + if (!e.getValue().contains(queueBinding) && isAddressInDivertForwards(queueAddress, forwardAddress)) { + // The plugin can block the demand totally here either based on the divert itself + // or the queue that's attached to the divert. if (isPluginBlockingFederationConsumerCreate(divertBinding.getDivert(), queueBinding.getQueue())) { return; } + // The plugin can block the demand selectively based on a single queue attached to + // the divert target(s). + if (isPluginBlockingFederationConsumerCreate(queueBinding.getQueue())) { + return; + } + + // Each divert that forwards to the address the queue is bound to we add demand + // in the diverts tracker. + e.getValue().add(queueBinding); + final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(divertBinding.getAddress()); - // We know it matches address policy at this point but we don't yet know if any other - // remote demand exists and we want to check here if the react method did indeed add - // demand on the address and if so add this queue into the diverts matching queues set. - if (reactIfBindingMatchesPolicy(addressInfo, queueBinding)) { - e.getValue().add(queueName); - } + createOrUpdateFederatedAddressConsumerForBinding(addressInfo, divertBinding); } }); } - private static boolean isAddressInDivertForwards(final SimpleString queueAddress, final SimpleString forwardAddress) { + private static boolean isAddressInDivertForwards(final SimpleString targetAddress, final SimpleString forwardAddress) { final SimpleString[] forwardAddresses = forwardAddress.split(','); for (SimpleString forward : forwardAddresses) { - if (queueAddress.equals(forward)) { + if (targetAddress.equals(forward)) { return true; } } @@ -303,60 +349,45 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi return false; } - protected final boolean reactIfBindingMatchesPolicy(AddressInfo address, QueueBinding binding) { - if (testIfAddressMatchesPolicy(address)) { - logger.trace("Federation Address Policy matched on for demand on address: {} : binding: {}", address, binding); + protected final void createOrUpdateFederatedAddressConsumerForBinding(AddressInfo address, Binding binding) { + logger.trace("Federation Address Policy matched on for demand on address: {} : binding: {}", address, binding); + + // Check for existing consumer add demand from a additional local consumer to ensure + // the remote consumer remains active until all local demand is withdrawn. The federation + // plugin can block creation of the federation consumer at this stage. + if (remoteConsumers.containsKey(address.getName().toString())) { + logger.trace("Federation Address Policy manager found existing demand for address: {}, adding demand", address); + remoteConsumers.get(address.getName().toString()).addDemand(binding); + } else if (!isPluginBlockingFederationConsumerCreate(address)) { + logger.trace("Federation Address Policy manager creating remote consumer for address: {}", address); final FederationConsumerInfo consumerInfo = createConsumerInfo(address); + final FederationConsumerInternal queueConsumer = createFederationConsumer(consumerInfo); + final FederationAddressEntry entry = createConsumerEntry(queueConsumer); - // Check for existing consumer add demand from a additional local consumer - // to ensure the remote consumer remains active until all local demand is - // withdrawn. - if (remoteConsumers.containsKey(consumerInfo)) { - logger.trace("Federation Address Policy manager found existing demand for address: {}", address); - remoteConsumers.get(consumerInfo).addDemand(); - } else { - if (isPluginBlockingFederationConsumerCreate(address)) { - return false; - } + signalBeforeCreateFederationConsumer(consumerInfo); - if (isPluginBlockingFederationConsumerCreate(binding.getQueue())) { - return false; - } - - logger.trace("Federation Address Policy manager creating remote consumer for address: {}", address); - - signalBeforeCreateFederationConsumer(consumerInfo); - - final FederationConsumerInternal queueConsumer = createFederationConsumer(consumerInfo); - final FederationConsumerEntry entry = createConsumerEntry(queueConsumer); - - // Handle remote close with remove of consumer which means that future demand will - // attempt to create a new consumer for that demand. Ensure that thread safety is - // accounted for here as the notification can be asynchronous. - queueConsumer.setRemoteClosedHandler((closedConsumer) -> { - synchronized (this) { - try { - remoteConsumers.remove(closedConsumer.getConsumerInfo()); - } finally { - closedConsumer.close(); - } + // Handle remote close with remove of consumer which means that future demand will + // attempt to create a new consumer for that demand. Ensure that thread safety is + // accounted for here as the notification can be asynchronous. + queueConsumer.setRemoteClosedHandler((closedConsumer) -> { + synchronized (this) { + try { + remoteConsumers.remove(closedConsumer.getConsumerInfo().getAddress()); + } finally { + closedConsumer.close(); } - }); + } + }); - // Called under lock so state should stay in sync - remoteConsumers.put(consumerInfo, entry); + // Called under lock so state should stay in sync + remoteConsumers.put(entry.getAddress(), entry.addDemand(binding)); - // Now that we are tracking it we can start it - queueConsumer.start(); + // Now that we are tracking it we can start it + queueConsumer.start(); - signalAfterCreateFederationConsumer(queueConsumer); - } - - return true; + signalAfterCreateFederationConsumer(queueConsumer); } - - return false; } /** @@ -386,17 +417,18 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi protected abstract FederationConsumerInfo createConsumerInfo(AddressInfo address); /** - * Creates a {@link FederationConsumerEntry} instance that will be used to store a {@link FederationConsumer} - * along with other state data needed to manage a federation consumer instance. A subclass can override - * this method to return a more customized entry type with additional state data. + * Creates a {@link FederationAddressEntry} instance that will be used to store an instance of + * an {@link FederationConsumer} along with other state data needed to manage a federation consumer + * instance lifetime. A subclass can override this method to return a more customized entry type with + * additional state data. * * @param consumer * The {@link FederationConsumerInternal} instance that will be housed in this entry. * - * @return a new {@link FederationConsumerEntry} that holds the given federation consumer. + * @return a new {@link FederationAddressEntry} that holds the given federation consumer. */ - protected FederationConsumerEntry createConsumerEntry(FederationConsumerInternal consumer) { - return new FederationConsumerEntry(consumer); + protected FederationAddressEntry createConsumerEntry(FederationConsumerInternal consumer) { + return new FederationAddressEntry(consumer); } /** diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationConsumerEntry.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueueEntry.java similarity index 61% rename from artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationConsumerEntry.java rename to artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueueEntry.java index 2abbdc2798..1bbe4384cf 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationConsumerEntry.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueueEntry.java @@ -17,6 +17,11 @@ package org.apache.activemq.artemis.protocol.amqp.federation.internal; +import java.util.HashSet; +import java.util.Set; + +import org.apache.activemq.artemis.core.server.ServerConsumer; + /** * Am entry type class used to hold a {@link FederationConsumerInternal} and * any other state data needed by the manager that is creating them based on the @@ -28,19 +33,19 @@ package org.apache.activemq.artemis.protocol.amqp.federation.internal; * on a federation resource such that it is not torn down until all demand has been * removed from the local resource. */ -public class FederationConsumerEntry { +public class FederationQueueEntry { private final FederationConsumerInternal consumer; - private int references = 1; + private final Set consumerDemand = new HashSet<>(); /** - * Creates a new consumer entry with a single reference + * Creates a new queue entry with a single reference * * @param consumer * The federation consumer that will be carried in this entry. */ - public FederationConsumerEntry(FederationConsumerInternal consumer) { + public FederationQueueEntry(FederationConsumerInternal consumer) { this.consumer = consumer; } @@ -52,10 +57,23 @@ public class FederationConsumerEntry { } /** - * Add additional demand on the resource associated with this entries consumer. + * @return true if there are bindings that are mapped to this federation entry. */ - public void addDemand() { - references++; + public boolean hasDemand() { + return !consumerDemand.isEmpty(); + } + + /** + * Add additional demand on the resource associated with this entries consumer. + * + * @param consumer + * The {@link ServerConsumer} that generated the demand on federated resource. + * + * @return this federation queue entry instance. + */ + public FederationQueueEntry addDemand(ServerConsumer consumer) { + consumerDemand.add(identifyConsumer(consumer)); + return this; } /** @@ -63,9 +81,19 @@ public class FederationConsumerEntry { * and returns true when demand reaches zero which indicates the consumer should be * closed and the entry cleaned up. * - * @return true if demand has fallen to zero on the resource associated with the consumer. + * @param consumer + * The {@link ServerConsumer} that generated the demand on federated resource. + * + * @return this federation queue entry instance. */ - public boolean reduceDemand() { - return --references == 0; + public FederationQueueEntry reduceDemand(ServerConsumer consumer) { + consumerDemand.remove(identifyConsumer(consumer)); + return this; + } + + private static String identifyConsumer(ServerConsumer consumer) { + return consumer.getConnectionID().toString() + ":" + + consumer.getSessionID() + ":" + + consumer.getID(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java index c715ea0502..33c9031ec3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java @@ -53,7 +53,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons protected final ActiveMQServer server; protected final Predicate federationConsumerMatcher; protected final FederationReceiveFromQueuePolicy policy; - protected final Map remoteConsumers = new HashMap<>(); + protected final Map remoteConsumers = new HashMap<>(); protected final FederationInternal federation; private volatile boolean started; @@ -106,12 +106,20 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons } @Override - public synchronized void beforeCloseConsumer(ServerConsumer consumer, boolean failed) { + public synchronized void afterCloseConsumer(ServerConsumer consumer, boolean failed) { if (started) { - final FederationConsumerInfo consumerInfo = createConsumerInfo(consumer); - final FederationConsumerEntry entry = remoteConsumers.get(consumerInfo); + final String queueName = consumer.getQueue().getName().toString(); + final FederationQueueEntry entry = remoteConsumers.get(queueName); - if (entry != null && entry.reduceDemand()) { + if (entry == null) { + return; + } + + entry.reduceDemand(consumer); + + logger.trace("Reducing demand on federated queue {}, remaining demand? {}", queueName, entry.hasDemand()); + + if (!entry.hasDemand()) { final FederationConsumerInternal federationConsuner = entry.getConsumer(); try { @@ -119,7 +127,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons federationConsuner.close(); signalAfterCloseFederationConsumer(federationConsuner); } finally { - remoteConsumers.remove(consumerInfo); + remoteConsumers.remove(queueName); } } } @@ -137,7 +145,8 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons queue.getConsumers() .stream() .filter(consumer -> consumer instanceof ServerConsumer) - .map(c -> (ServerConsumer) c).forEach(this::reactIfConsumerMatchesPolicy); + .map(c -> (ServerConsumer) c) + .forEach(this::reactIfConsumerMatchesPolicy); } protected final void reactIfConsumerMatchesPolicy(ServerConsumer consumer) { @@ -160,13 +169,16 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons // Check for existing consumer add demand from a additional local consumer // to ensure the remote consumer remains active until all local demand is // withdrawn. - if (remoteConsumers.containsKey(consumerInfo)) { - remoteConsumers.get(consumerInfo).addDemand(); + if (remoteConsumers.containsKey(consumerInfo.getQueueName())) { + logger.trace("Federation Queue Policy manager found existing demand for queue: {}, adding demand", consumerInfo.getQueueName()); + remoteConsumers.get(consumerInfo.getQueueName()).addDemand(consumer); } else { + logger.trace("Federation Queue Policy manager creating remote consumer for queue: {}", consumerInfo.getQueueName()); + signalBeforeCreateFederationConsumer(consumerInfo); final FederationConsumerInternal queueConsumer = createFederationConsumer(consumerInfo); - final FederationConsumerEntry entry = createConsumerEntry(queueConsumer); + final FederationQueueEntry entry = createServerConsumerEntry(queueConsumer).addDemand(consumer); // Handle remote close with remove of consumer which means that future demand will // attempt to create a new consumer for that demand. Ensure that thread safety is @@ -174,7 +186,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons queueConsumer.setRemoteClosedHandler((closedConsumer) -> { synchronized (this) { try { - remoteConsumers.remove(closedConsumer.getConsumerInfo()); + remoteConsumers.remove(closedConsumer.getConsumerInfo().getQueueName()); } finally { closedConsumer.close(); } @@ -182,7 +194,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons }); // Called under lock so state should stay in sync - remoteConsumers.put(consumerInfo, entry); + remoteConsumers.put(consumerInfo.getQueueName(), entry); // Now that we are tracking it we can start it queueConsumer.start(); @@ -224,17 +236,18 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons } /** - * Creates a {@link FederationConsumerEntry} instance that will be used to store a {@link FederationConsumer} - * along with other state data needed to manage a federation consumer instance. A subclass can override - * this method to return a more customized entry type with additional state data. + * Creates a {@link FederationQueueEntry} instance that will be used to store an instance of + * a {@link FederationConsumer} along with other state data needed to manage a federation consumer + * instance. A subclass can override this method to return a more customized entry type with additional + * state data. * * @param consumer * The {@link FederationConsumerInternal} instance that will be housed in this entry. * - * @return a new {@link FederationConsumerEntry} that holds the given federation consumer. + * @return a new {@link FederationQueueEntry} that holds the given federation consumer. */ - protected FederationConsumerEntry createConsumerEntry(FederationConsumerInternal consumer) { - return new FederationConsumerEntry(consumer); + protected FederationQueueEntry createServerConsumerEntry(FederationConsumerInternal consumer) { + return new FederationQueueEntry(consumer); } /** diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java index 82c36295c3..05d4708eea 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java @@ -54,6 +54,12 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -66,6 +72,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -77,9 +84,14 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFedera import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin; +import org.apache.activemq.artemis.protocol.amqp.federation.Federation; +import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer; +import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; @@ -180,14 +192,15 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport { final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); - // Should be no frames generated as we already federated the address and the staticly added + // Should be no frames generated as we already federated the address and the statically added // queue should retain demand when this consumer leaves. try (Connection connection = factory.createConnection()) { final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = session.createConsumer(session.createTopic("test")); + + session.createConsumer(session.createTopic("test")); + session.createConsumer(session.createTopic("test")); connection.start(); - consumer.close(); } peer.waitForScriptToComplete(5, TimeUnit.SECONDS); @@ -2265,6 +2278,420 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport { } } + @Test(timeout = 20000) + public void testFederationStartedTriggersRemoteDemandWithExistingAddressBindings() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Test started, peer listening on: {}", remoteURI); + + final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement(); + receiveFromAddress.setName("address-policy"); + receiveFromAddress.addToIncludes("test"); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName("sample-federation"); + element.addLocalAddressPolicy(receiveFromAddress); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration("test-address-federation", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setAutostart(false); + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.MULTICAST) + .setAddress("test") + .setAutoCreated(false)); + + Wait.assertTrue(() -> server.queueQuery(SimpleString.toSimpleString("test")).isExists()); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + // Create demand on the addresses so that on start federation should happen + final Connection connection = factory.createConnection(); + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + + session.createConsumer(session.createTopic("test")); + session.createConsumer(session.createTopic("test")); + + // Add other non-federation address bindings for the policy to check on start. + session.createConsumer(session.createTopic("a1")); + session.createConsumer(session.createTopic("a2")); + + connection.start(); + + // Should be no interactions at this point, check to make sure. + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(allOf(containsString("sample-federation"), + containsString("test"), + containsString("address-receiver"), + containsString(server.getNodeID().toString()))) + .respond() + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + + // Starting the broker connection should trigger federation of address with demand. + server.getBrokerConnections().forEach(c -> { + try { + c.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Add more demand while federation is starting + session.createConsumer(session.createTopic("test")); + session.createConsumer(session.createTopic("test")); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // This removes the connection demand, but leaves behind the static queue + connection.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach().respond(); + + // This should trigger the federation consumer to be shutdown as the statically defined queue + // should be the only remaining demand on the address. + logger.info("Removing Queues from federated address to eliminate demand"); + server.destroyQueue(SimpleString.toSimpleString("test")); + Wait.assertFalse(() -> server.queueQuery(SimpleString.toSimpleString("test")).isExists()); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + + @Test(timeout = 20000) + public void testFederationStartedTriggersRemoteDemandWithExistingAddressAndDivertBindings() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Test started, peer listening on: {}", remoteURI); + + final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement(); + receiveFromAddress.setName("address-policy"); + receiveFromAddress.addToIncludes("test"); + receiveFromAddress.setEnableDivertBindings(true); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName("sample-federation"); + element.addLocalAddressPolicy(receiveFromAddress); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration("test-address-federation", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setAutostart(false); + amqpConnection.addElement(element); + + final DivertConfiguration divert = new DivertConfiguration(); + divert.setName("test-divert"); + divert.setAddress("test"); + divert.setExclusive(false); + divert.setForwardingAddress("target"); + divert.setRoutingType(ComponentConfigurationRoutingType.MULTICAST); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + // Configure addresses and divert for the test + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST)); + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("target"), RoutingType.MULTICAST)); + server.deployDivert(divert); + + // Create demand on the addresses so that on start federation should happen + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + final Connection connection = factory.createConnection(); + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic test = session.createTopic("test"); + final Topic target = session.createTopic("target"); + + session.createConsumer(test); + session.createConsumer(test); + + session.createConsumer(target); + session.createConsumer(target); + + // Add other non-federation address bindings for the policy to check on start. + session.createConsumer(session.createTopic("a1")); + session.createConsumer(session.createTopic("a2")); + + connection.start(); + + // Should be no interactions at this point, check to make sure. + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(allOf(containsString("sample-federation"), + containsString("test"), + containsString("address-receiver"), + containsString(server.getNodeID().toString()))) + .respond() + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + + // Starting the broker connection should trigger federation of address with demand. + server.getBrokerConnections().forEach(c -> { + try { + c.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Add more demand while federation is starting + session.createConsumer(test); + session.createConsumer(target); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach().respond(); + + // This removes the connection demand, but leaves behind the static queue + connection.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + + @Test(timeout = 20000) + public void testFederationStartTriggersFederationWithMultipleDivertsAndRemainsActiveAfterOneRemoved() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Test started, peer listening on: {}", remoteURI); + + final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement(); + receiveFromAddress.setName("address-policy"); + receiveFromAddress.addToIncludes("test"); + receiveFromAddress.setEnableDivertBindings(true); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName("sample-federation"); + element.addLocalAddressPolicy(receiveFromAddress); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration("test-address-federation", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setAutostart(false); + amqpConnection.addElement(element); + + final DivertConfiguration divert1 = new DivertConfiguration(); + divert1.setName("test-divert-1"); + divert1.setAddress("test"); + divert1.setExclusive(false); + divert1.setForwardingAddress("target1,target2"); + divert1.setRoutingType(ComponentConfigurationRoutingType.MULTICAST); + + final DivertConfiguration divert2 = new DivertConfiguration(); + divert2.setName("test-divert-2"); + divert2.setAddress("test"); + divert2.setExclusive(false); + divert2.setForwardingAddress("target1,target3"); + divert2.setRoutingType(ComponentConfigurationRoutingType.MULTICAST); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + // Configure addresses and divert for the test + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST)); + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("target1"), RoutingType.MULTICAST)); + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("target2"), RoutingType.MULTICAST)); + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("target3"), RoutingType.MULTICAST)); + server.deployDivert(divert1); + server.deployDivert(divert2); + + // Create demand on the addresses so that on start federation should happen + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + final Connection connection = factory.createConnection(); + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic target1 = session.createTopic("target1"); + final Topic target2 = session.createTopic("target2"); + final Topic target3 = session.createTopic("target2"); + + session.createConsumer(target1); + session.createConsumer(target2); + session.createConsumer(target3); + + connection.start(); + + // Should be no interactions at this point, check to make sure. + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(allOf(containsString("sample-federation"), + containsString("test"), + containsString("address-receiver"), + containsString(server.getNodeID().toString()))) + .respond() + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + + // Starting the broker connection should trigger federation of address with demand. + server.getBrokerConnections().forEach(c -> { + try { + c.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Add more demand while federation is starting + session.createConsumer(target1); + session.createConsumer(target2); + session.createConsumer(target3); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + server.destroyDivert(SimpleString.toSimpleString(divert1.getName())); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach().respond(); + + server.destroyDivert(SimpleString.toSimpleString(divert2.getName())); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + connection.close(); + + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + + @Test(timeout = 20000) + public void testFederationPluginCanLimitDemandToOnlyTheConfiguredDivert() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Test started, peer listening on: {}", remoteURI); + + final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement(); + receiveFromAddress.setName("address-policy"); + receiveFromAddress.addToIncludes("test"); + receiveFromAddress.setEnableDivertBindings(true); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName("sample-federation"); + element.addLocalAddressPolicy(receiveFromAddress); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration("test-address-federation", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(5); + amqpConnection.addElement(element); + + final DivertConfiguration divert = new DivertConfiguration(); + divert.setName("test-divert-1"); + divert.setAddress("test"); + divert.setExclusive(false); + divert.setForwardingAddress("target"); + divert.setRoutingType(ComponentConfigurationRoutingType.MULTICAST); + + final AMQPTestFederationBrokerPlugin federationPlugin = new AMQPTestFederationBrokerPlugin(); + federationPlugin.shouldCreateConsumerForDivert = (d, q) -> true; + federationPlugin.shouldCreateConsumerForQueue = (q) -> { + // Disallow any binding on the source address from creating federation demand + // any other binding that matches the policy will create demand for federation. + if (q.getAddress().toString().equals("test")) { + return false; + } + + return true; + }; + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + // Configure addresses and divert for the test + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST)); + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("target"), RoutingType.MULTICAST)); + server.deployDivert(divert); + server.registerBrokerPlugin(federationPlugin); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + + // Should be ignored as plugin rejects this binding as demand. + final MessageConsumer consumer = session.createConsumer(session.createTopic("test")); + + connection.start(); + + // Get a round trip to the broker to allow time for federation to hopefully + // reject this first consumer on the source address. + consumer.receiveNoWait(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(allOf(containsString("sample-federation"), + containsString("test"), + containsString("address-receiver"), + containsString(server.getNodeID().toString()))) + .respond() + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + + session.createConsumer(session.createTopic("target")); + + // Now a federation receiver should get created for the divert demand. + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + } + public static class ApplicationPropertiesTransformer implements Transformer { private final Map properties = new HashMap<>(); @@ -2372,4 +2799,82 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport { .withOfferedCapability(FEDERATION_CONTROL_LINK.toString()); peer.expectFlow(); } + + private class AMQPTestFederationBrokerPlugin implements ActiveMQServerAMQPFederationPlugin { + + public final AtomicBoolean started = new AtomicBoolean(); + public final AtomicBoolean stopped = new AtomicBoolean(); + + public final AtomicReference beforeCreateConsumerCapture = new AtomicReference<>(); + public final AtomicReference afterCreateConsumerCapture = new AtomicReference<>(); + public final AtomicReference beforeCloseConsumerCapture = new AtomicReference<>(); + public final AtomicReference afterCloseConsumerCapture = new AtomicReference<>(); + + public Consumer beforeCreateConsumer = (c) -> beforeCreateConsumerCapture.set(c);; + public Consumer afterCreateConsumer = (c) -> afterCreateConsumerCapture.set(c); + public Consumer beforeCloseConsumer = (c) -> beforeCloseConsumerCapture.set(c); + public Consumer afterCloseConsumer = (c) -> afterCloseConsumerCapture.set(c); + + public BiConsumer beforeMessageHandled = (c, m) -> { }; + public BiConsumer afterMessageHandled = (c, m) -> { }; + + public Function shouldCreateConsumerForAddress = (a) -> true; + public Function shouldCreateConsumerForQueue = (q) -> true; + public BiFunction shouldCreateConsumerForDivert = (d, q) -> true; + + @Override + public void federationStarted(final Federation federation) throws ActiveMQException { + started.set(true); + } + + @Override + public void federationStopped(final Federation federation) throws ActiveMQException { + stopped.set(true); + } + + @Override + public void beforeCreateFederationConsumer(final FederationConsumerInfo consumerInfo) throws ActiveMQException { + beforeCreateConsumer.accept(consumerInfo); + } + + @Override + public void afterCreateFederationConsumer(final FederationConsumer consumer) throws ActiveMQException { + afterCreateConsumer.accept(consumer); + } + + @Override + public void beforeCloseFederationConsumer(final FederationConsumer consumer) throws ActiveMQException { + beforeCloseConsumer.accept(consumer); + } + + @Override + public void afterCloseFederationConsumer(final FederationConsumer consumer) throws ActiveMQException { + afterCloseConsumer.accept(consumer); + } + + @Override + public void beforeFederationConsumerMessageHandled(final FederationConsumer consumer, org.apache.activemq.artemis.api.core.Message message) throws ActiveMQException { + beforeMessageHandled.accept(consumer, message); + } + + @Override + public void afterFederationConsumerMessageHandled(final FederationConsumer consumer, org.apache.activemq.artemis.api.core.Message message) throws ActiveMQException { + afterMessageHandled.accept(consumer, message); + } + + @Override + public boolean shouldCreateFederationConsumerForAddress(final AddressInfo address) throws ActiveMQException { + return shouldCreateConsumerForAddress.apply(address); + } + + @Override + public boolean shouldCreateFederationConsumerForQueue(final org.apache.activemq.artemis.core.server.Queue queue) throws ActiveMQException { + return shouldCreateConsumerForQueue.apply(queue); + } + + @Override + public boolean shouldCreateFederationConsumerForDivert(Divert divert, org.apache.activemq.artemis.core.server.Queue queue) throws ActiveMQException { + return shouldCreateConsumerForDivert.apply(divert, queue); + } + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java index 68242342a3..545fc465fd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java @@ -2600,6 +2600,93 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { } } + @Test(timeout = 20000) + public void testFederationCreatesQueueReceiverLinkForQueueAfterBrokerConnectionStarted() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement(); + receiveFromQueue.setName("queue-policy"); + receiveFromQueue.addToIncludes("test", "test"); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName("sample-federation"); + element.addLocalQueuePolicy(receiveFromQueue); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setAutostart(false); + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST) + .setAddress("test") + .setAutoCreated(false)); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + final Connection connection = factory.createConnection(); + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue("test"); + + session.createConsumer(queue); + session.createConsumer(queue); + session.createConsumer(queue); + + connection.start(); + + // Should be no interactions yet, check to be sure + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + server.getBrokerConnections().forEach(c -> { + try { + c.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Add another demand consumer immediately after start to add more events + // on the federation plugin for broker events. + session.createConsumer(queue); + + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(allOf(containsString("sample-federation"), + containsString("test::test"), + containsString("queue-receiver"), + containsString(server.getNodeID().toString()))) + .withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT) + .respond() + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()); + peer.expectFlow().withLinkCredit(1000); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach().respond(); + + // Add more demand after the broker connection starts + session.createConsumer(queue); + session.createConsumer(queue); + + // This should remove all demand on the queue and federation should stop + connection.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + public static class ApplicationPropertiesTransformer implements Transformer { private final Map properties = new HashMap<>();