ARTEMIS-4626 Improve demand tracking for federation consumers

Implement idempotent tracking for tracking demand at start while also
tracking demand as consumer or bindings are added and removed.
This commit is contained in:
Timothy Bish 2024-01-31 15:21:33 -05:00 committed by clebertsuconic
parent 9d988dd9d0
commit 169d6317d9
6 changed files with 926 additions and 169 deletions

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.federation.internal;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.core.postoffice.Binding;
/**
* Am entry type class used to hold a {@link FederationConsumerInternal} and
* any other state data needed by the manager that is creating them based on the
* policy configuration for the federation instance. The entry can be extended
* by federation implementation to hold additional state data for the federation
* consumer and the managing of its lifetime.
*
* This entry type provides a reference counter that can be used to register demand
* on a federation resource such that it is not torn down until all demand has been
* removed from the local resource.
*/
public class FederationAddressEntry {
private final FederationConsumerInternal consumer;
private final Set<Binding> demandBindings = new HashSet<>();
/**
* Creates a new address entry with a single reference
*
* @param consumer
* The federation consumer that will be carried in this entry.
*/
public FederationAddressEntry(FederationConsumerInternal consumer) {
this.consumer = consumer;
}
/**
* @return the address that this entry is acting to federate.
*/
public String getAddress() {
return consumer.getConsumerInfo().getAddress();
}
/**
* @return the consumer managed by this entry
*/
public FederationConsumerInternal getConsumer() {
return consumer;
}
/**
* @return true if there are bindings that are mapped to this federation entry.
*/
public boolean hasDemand() {
return !demandBindings.isEmpty();
}
/**
* Add demand on this federation address consumer from the given binding.
*
* @return this federation address consumer entry.
*/
public FederationAddressEntry addDemand(Binding binding) {
demandBindings.add(binding);
return this;
}
/**
* Reduce demand on this federation address consumer from the given binding.
*
* @return this federation address consumer entry.
*/
public FederationAddressEntry removeDenamd(Binding binding) {
demandBindings.remove(binding);
return this;
}
}

View File

@ -61,10 +61,10 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final ActiveMQServer server;
protected final FederationReceiveFromAddressPolicy policy;
protected final Map<FederationConsumerInfo, FederationConsumerEntry> remoteConsumers = new HashMap<>();
protected final FederationInternal federation;
protected final Map<DivertBinding, Set<SimpleString>> matchingDiverts = new HashMap<>();
protected final FederationReceiveFromAddressPolicy policy;
protected final Map<String, FederationAddressEntry> remoteConsumers = new HashMap<>();
protected final Map<DivertBinding, Set<QueueBinding>> matchingDiverts = new HashMap<>();
private volatile boolean started;
@ -75,7 +75,6 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
this.federation = federation;
this.policy = addressPolicy;
this.server = federation.getServer();
this.server.registerBrokerPlugin(this);
}
/**
@ -108,84 +107,72 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
}
@Override
public synchronized void afterAddAddress(AddressInfo addressInfo, boolean reload) {
if (started && policy.isEnableDivertBindings() && policy.test(addressInfo)) {
try {
// A Divert can exist in configuration prior to the address having been auto created
// etc so upon address add this check needs to be run to capture addresses that now
// match the divert.
server.getPostOffice()
.getDirectBindings(addressInfo.getName())
.stream().filter(binding -> binding instanceof DivertBinding)
.forEach(this::afterAddBinding);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationBindingsLookupError(addressInfo.getName(), e);
}
}
}
@Override
public synchronized void afterAddBinding(Binding binding) {
public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException {
if (started) {
checkBindingForMatch(binding);
}
}
if (binding instanceof QueueBinding) {
final FederationAddressEntry entry = remoteConsumers.get(binding.getAddress().toString());
@Override
public synchronized void beforeRemoveBinding(SimpleString bindingName, Transaction tx, boolean deleteData) {
final Binding binding = server.getPostOffice().getBinding(bindingName);
final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress());
if (entry != null) {
// This is QueueBinding that was mapped to a federated address so we can directly remove
// demand from the federation consumer and close it if demand is now gone.
tryRemoveDemandOnAddress(entry, binding);
} else if (policy.isEnableDivertBindings()) {
// See if there is any matching diverts that are forwarding to an address where this QueueBinding
// is bound and remove the mapping for any matches, diverts can have a composite set of address
// forwards so each divert must be checked in turn to see if it contains the address the removed
// binding was bound to.
matchingDiverts.entrySet().forEach(divertEntry -> {
final String sourceAddress = divertEntry.getKey().getAddress().toString();
final SimpleString forwardAddress = divertEntry.getKey().getDivert().getForwardAddress();
if (binding instanceof QueueBinding) {
tryRemoveDemandOnAddress(addressInfo);
if (isAddressInDivertForwards(binding.getAddress(), forwardAddress)) {
// Try and remove the queue binding from the set of registered bindings
// for the divert and if that removes all mapped bindings then we can
// remove the divert from the federated address entry and check if that
// removed all local demand which means we can close the consumer.
divertEntry.getValue().remove(binding);
if (policy.isEnableDivertBindings()) {
// See if there is any matching diverts that match this queue binding and remove demand now that
// the queue is going away. Since a divert can be composite we need to check for a match of the
// queue address on each of the forwards if there are any.
matchingDiverts.entrySet().forEach(entry -> {
final SimpleString forwardAddress = entry.getKey().getDivert().getForwardAddress();
if (isAddressInDivertForwards(binding.getAddress(), forwardAddress)) {
final AddressInfo srcAddressInfo = server.getPostOffice().getAddressInfo(entry.getKey().getAddress());
if (entry.getValue().remove(((QueueBinding) binding).getQueue().getName())) {
tryRemoveDemandOnAddress(srcAddressInfo);
if (divertEntry.getValue().isEmpty()) {
tryRemoveDemandOnAddress(remoteConsumers.get(sourceAddress), divertEntry.getKey());
}
}
}
});
}
} else if (policy.isEnableDivertBindings() || binding instanceof DivertBinding) {
final DivertBinding divertBinding = (DivertBinding) binding;
final Set<SimpleString> matchingQueues = matchingDiverts.remove(binding);
});
}
} else if (policy.isEnableDivertBindings() && binding instanceof DivertBinding) {
final DivertBinding divert = (DivertBinding) binding;
// Each entry in the matching queues set is one instance of demand that was
// registered on the source address which would have been federated from the
// remote so on remove we deduct each and if that removes all demand the remote
// consumer will be closed.
if (matchingQueues != null) {
try {
matchingQueues.forEach((queueName) -> tryRemoveDemandOnAddress(addressInfo));
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationBindingsLookupError(divertBinding.getDivert().getForwardAddress(), e);
if (matchingDiverts.remove(divert) != null) {
// The divert binding is treated as one unit of demand on a federated address and
// when the divert is removed that unit of demand is removed regardless of existing
// bindings still remaining on the divert forwards. If the divert demand was the
// only thing keeping the federated address consumer open this will result in it
// being closed.
try {
tryRemoveDemandOnAddress(remoteConsumers.get(divert.getAddress().toString()), divert);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationBindingsLookupError(divert.getDivert().getForwardAddress(), e);
}
}
}
}
}
protected final void tryRemoveDemandOnAddress(AddressInfo addressInfo) {
final FederationConsumerInfo consumerInfo = createConsumerInfo(addressInfo);
final FederationConsumerEntry entry = remoteConsumers.get(consumerInfo);
protected final void tryRemoveDemandOnAddress(FederationAddressEntry entry, Binding binding) {
if (entry != null) {
entry.removeDenamd(binding);
if (entry != null && entry.reduceDemand()) {
final FederationConsumerInternal federationConsuner = entry.getConsumer();
logger.trace("Reducing demand on federated address {}, remaining demand? {}", entry.getAddress(), entry.hasDemand());
try {
signalBeforeCloseFederationConsumer(federationConsuner);
federationConsuner.close();
signalAfterCloseFederationConsumer(federationConsuner);
} finally {
remoteConsumers.remove(consumerInfo);
if (!entry.hasDemand()) {
final FederationConsumerInternal federationConsuner = entry.getConsumer();
try {
signalBeforeCloseFederationConsumer(federationConsuner);
federationConsuner.close();
signalAfterCloseFederationConsumer(federationConsuner);
} finally {
remoteConsumers.remove(entry.getAddress());
}
}
}
}
@ -199,17 +186,61 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
protected final void scanAllBindings() {
server.getPostOffice()
.getAllBindings()
.filter(bind -> bind instanceof QueueBinding || (policy.isEnableDivertBindings() && bind instanceof DivertBinding))
.forEach(bind -> checkBindingForMatch(bind));
.filter(binding -> binding instanceof QueueBinding || (policy.isEnableDivertBindings() && binding instanceof DivertBinding))
.forEach(binding -> afterAddBinding(binding));
}
@Override
public synchronized void afterAddAddress(AddressInfo addressInfo, boolean reload) {
if (started && policy.isEnableDivertBindings() && policy.test(addressInfo)) {
try {
// A Divert can exist in configuration prior to the address having been auto created
// etc so upon address add this check needs to be run to capture addresses that now
// match the divert.
server.getPostOffice()
.getDirectBindings(addressInfo.getName())
.stream().filter(binding -> binding instanceof DivertBinding)
.forEach(this::checkBindingForMatch);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationBindingsLookupError(addressInfo.getName(), e);
}
}
}
@Override
public synchronized void afterAddBinding(Binding binding) {
if (started) {
checkBindingForMatch(binding);
}
}
/**
* Called under lock this method should check if the given {@link Binding} matches the
* configured address federation policy and federate the address if so. The incoming
* {@link Binding} can be either a {@link QueueBinding} or a {@link DivertBinding} so
* the code should check both.
*
* @param binding
* The binding that should be checked against the federated address policy,
*/
protected final void checkBindingForMatch(Binding binding) {
if (binding instanceof QueueBinding) {
final QueueBinding queueBinding = (QueueBinding) binding;
final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress());
reactIfBindingMatchesPolicy(addressInfo, queueBinding);
reactIfQueueBindingMatchesAnyDivertTarget(queueBinding);
if (testIfAddressMatchesPolicy(addressInfo)) {
// A plugin can block address federation for a given queue and if another queue
// binding does trigger address federation we don't want to track the rejected
// queue as demand so we always run this check before trying to create the address
// consumer.
if (isPluginBlockingFederationConsumerCreate(queueBinding.getQueue())) {
return;
}
createOrUpdateFederatedAddressConsumerForBinding(addressInfo, queueBinding);
} else {
reactIfQueueBindingMatchesAnyDivertTarget(queueBinding);
}
} else if (binding instanceof DivertBinding) {
reactIfAnyQueueBindingMatchesDivertTarget((DivertBinding) binding);
}
@ -229,7 +260,7 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
// We only need to check if we've never seen the divert before, afterwards we will
// be checking it any time a new QueueBinding is added instead.
if (matchingDiverts.get(divertBinding) == null) {
final Set<SimpleString> matchingQueues = new HashSet<>();
final Set<QueueBinding> matchingQueues = new HashSet<>();
matchingDiverts.put(divertBinding, matchingQueues);
// We must account for the composite divert case by splitting the address and
@ -240,16 +271,25 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
try {
for (SimpleString forward : forwardAddresses) {
server.getPostOffice().getBindingsForAddress(forward).getBindings()
.stream().filter(b -> b instanceof QueueBinding)
.stream()
.filter(b -> b instanceof QueueBinding)
.map(b -> (QueueBinding) b)
.forEach(queueBinding -> {
// The plugin can block the demand totally here either based on the divert itself
// or the queue that's attached to the divert.
if (isPluginBlockingFederationConsumerCreate(divertBinding.getDivert(), queueBinding.getQueue())) {
return;
}
if (reactIfBindingMatchesPolicy(addressInfo, queueBinding)) {
matchingQueues.add(queueBinding.getQueue().getName());
// The plugin can block the demand selectively based on a single queue attached to
// the divert target(s).
if (isPluginBlockingFederationConsumerCreate(queueBinding.getQueue())) {
return;
}
matchingQueues.add(queueBinding);
createOrUpdateFederatedAddressConsumerForBinding(addressInfo, divertBinding);
});
}
} catch (Exception e) {
@ -264,7 +304,6 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
}
final SimpleString queueAddress = queueBinding.getAddress();
final SimpleString queueName = queueBinding.getQueue().getName();
matchingDiverts.entrySet().forEach((e) -> {
final SimpleString forwardAddress = e.getKey().getDivert().getForwardAddress();
@ -274,28 +313,35 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
// addresses (composite diverts) of the Divert and if so then we can check if we need
// to create demand on the source address on the remote if we haven't done so already.
if (!e.getValue().contains(queueName) && isAddressInDivertForwards(queueAddress, forwardAddress)) {
if (!e.getValue().contains(queueBinding) && isAddressInDivertForwards(queueAddress, forwardAddress)) {
// The plugin can block the demand totally here either based on the divert itself
// or the queue that's attached to the divert.
if (isPluginBlockingFederationConsumerCreate(divertBinding.getDivert(), queueBinding.getQueue())) {
return;
}
// The plugin can block the demand selectively based on a single queue attached to
// the divert target(s).
if (isPluginBlockingFederationConsumerCreate(queueBinding.getQueue())) {
return;
}
// Each divert that forwards to the address the queue is bound to we add demand
// in the diverts tracker.
e.getValue().add(queueBinding);
final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(divertBinding.getAddress());
// We know it matches address policy at this point but we don't yet know if any other
// remote demand exists and we want to check here if the react method did indeed add
// demand on the address and if so add this queue into the diverts matching queues set.
if (reactIfBindingMatchesPolicy(addressInfo, queueBinding)) {
e.getValue().add(queueName);
}
createOrUpdateFederatedAddressConsumerForBinding(addressInfo, divertBinding);
}
});
}
private static boolean isAddressInDivertForwards(final SimpleString queueAddress, final SimpleString forwardAddress) {
private static boolean isAddressInDivertForwards(final SimpleString targetAddress, final SimpleString forwardAddress) {
final SimpleString[] forwardAddresses = forwardAddress.split(',');
for (SimpleString forward : forwardAddresses) {
if (queueAddress.equals(forward)) {
if (targetAddress.equals(forward)) {
return true;
}
}
@ -303,60 +349,45 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
return false;
}
protected final boolean reactIfBindingMatchesPolicy(AddressInfo address, QueueBinding binding) {
if (testIfAddressMatchesPolicy(address)) {
logger.trace("Federation Address Policy matched on for demand on address: {} : binding: {}", address, binding);
protected final void createOrUpdateFederatedAddressConsumerForBinding(AddressInfo address, Binding binding) {
logger.trace("Federation Address Policy matched on for demand on address: {} : binding: {}", address, binding);
// Check for existing consumer add demand from a additional local consumer to ensure
// the remote consumer remains active until all local demand is withdrawn. The federation
// plugin can block creation of the federation consumer at this stage.
if (remoteConsumers.containsKey(address.getName().toString())) {
logger.trace("Federation Address Policy manager found existing demand for address: {}, adding demand", address);
remoteConsumers.get(address.getName().toString()).addDemand(binding);
} else if (!isPluginBlockingFederationConsumerCreate(address)) {
logger.trace("Federation Address Policy manager creating remote consumer for address: {}", address);
final FederationConsumerInfo consumerInfo = createConsumerInfo(address);
final FederationConsumerInternal queueConsumer = createFederationConsumer(consumerInfo);
final FederationAddressEntry entry = createConsumerEntry(queueConsumer);
// Check for existing consumer add demand from a additional local consumer
// to ensure the remote consumer remains active until all local demand is
// withdrawn.
if (remoteConsumers.containsKey(consumerInfo)) {
logger.trace("Federation Address Policy manager found existing demand for address: {}", address);
remoteConsumers.get(consumerInfo).addDemand();
} else {
if (isPluginBlockingFederationConsumerCreate(address)) {
return false;
}
signalBeforeCreateFederationConsumer(consumerInfo);
if (isPluginBlockingFederationConsumerCreate(binding.getQueue())) {
return false;
}
logger.trace("Federation Address Policy manager creating remote consumer for address: {}", address);
signalBeforeCreateFederationConsumer(consumerInfo);
final FederationConsumerInternal queueConsumer = createFederationConsumer(consumerInfo);
final FederationConsumerEntry entry = createConsumerEntry(queueConsumer);
// Handle remote close with remove of consumer which means that future demand will
// attempt to create a new consumer for that demand. Ensure that thread safety is
// accounted for here as the notification can be asynchronous.
queueConsumer.setRemoteClosedHandler((closedConsumer) -> {
synchronized (this) {
try {
remoteConsumers.remove(closedConsumer.getConsumerInfo());
} finally {
closedConsumer.close();
}
// Handle remote close with remove of consumer which means that future demand will
// attempt to create a new consumer for that demand. Ensure that thread safety is
// accounted for here as the notification can be asynchronous.
queueConsumer.setRemoteClosedHandler((closedConsumer) -> {
synchronized (this) {
try {
remoteConsumers.remove(closedConsumer.getConsumerInfo().getAddress());
} finally {
closedConsumer.close();
}
});
}
});
// Called under lock so state should stay in sync
remoteConsumers.put(consumerInfo, entry);
// Called under lock so state should stay in sync
remoteConsumers.put(entry.getAddress(), entry.addDemand(binding));
// Now that we are tracking it we can start it
queueConsumer.start();
// Now that we are tracking it we can start it
queueConsumer.start();
signalAfterCreateFederationConsumer(queueConsumer);
}
return true;
signalAfterCreateFederationConsumer(queueConsumer);
}
return false;
}
/**
@ -386,17 +417,18 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
protected abstract FederationConsumerInfo createConsumerInfo(AddressInfo address);
/**
* Creates a {@link FederationConsumerEntry} instance that will be used to store a {@link FederationConsumer}
* along with other state data needed to manage a federation consumer instance. A subclass can override
* this method to return a more customized entry type with additional state data.
* Creates a {@link FederationAddressEntry} instance that will be used to store an instance of
* an {@link FederationConsumer} along with other state data needed to manage a federation consumer
* instance lifetime. A subclass can override this method to return a more customized entry type with
* additional state data.
*
* @param consumer
* The {@link FederationConsumerInternal} instance that will be housed in this entry.
*
* @return a new {@link FederationConsumerEntry} that holds the given federation consumer.
* @return a new {@link FederationAddressEntry} that holds the given federation consumer.
*/
protected FederationConsumerEntry createConsumerEntry(FederationConsumerInternal consumer) {
return new FederationConsumerEntry(consumer);
protected FederationAddressEntry createConsumerEntry(FederationConsumerInternal consumer) {
return new FederationAddressEntry(consumer);
}
/**

View File

@ -17,6 +17,11 @@
package org.apache.activemq.artemis.protocol.amqp.federation.internal;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.core.server.ServerConsumer;
/**
* Am entry type class used to hold a {@link FederationConsumerInternal} and
* any other state data needed by the manager that is creating them based on the
@ -28,19 +33,19 @@ package org.apache.activemq.artemis.protocol.amqp.federation.internal;
* on a federation resource such that it is not torn down until all demand has been
* removed from the local resource.
*/
public class FederationConsumerEntry {
public class FederationQueueEntry {
private final FederationConsumerInternal consumer;
private int references = 1;
private final Set<String> consumerDemand = new HashSet<>();
/**
* Creates a new consumer entry with a single reference
* Creates a new queue entry with a single reference
*
* @param consumer
* The federation consumer that will be carried in this entry.
*/
public FederationConsumerEntry(FederationConsumerInternal consumer) {
public FederationQueueEntry(FederationConsumerInternal consumer) {
this.consumer = consumer;
}
@ -52,10 +57,23 @@ public class FederationConsumerEntry {
}
/**
* Add additional demand on the resource associated with this entries consumer.
* @return true if there are bindings that are mapped to this federation entry.
*/
public void addDemand() {
references++;
public boolean hasDemand() {
return !consumerDemand.isEmpty();
}
/**
* Add additional demand on the resource associated with this entries consumer.
*
* @param consumer
* The {@link ServerConsumer} that generated the demand on federated resource.
*
* @return this federation queue entry instance.
*/
public FederationQueueEntry addDemand(ServerConsumer consumer) {
consumerDemand.add(identifyConsumer(consumer));
return this;
}
/**
@ -63,9 +81,19 @@ public class FederationConsumerEntry {
* and returns true when demand reaches zero which indicates the consumer should be
* closed and the entry cleaned up.
*
* @return true if demand has fallen to zero on the resource associated with the consumer.
* @param consumer
* The {@link ServerConsumer} that generated the demand on federated resource.
*
* @return this federation queue entry instance.
*/
public boolean reduceDemand() {
return --references == 0;
public FederationQueueEntry reduceDemand(ServerConsumer consumer) {
consumerDemand.remove(identifyConsumer(consumer));
return this;
}
private static String identifyConsumer(ServerConsumer consumer) {
return consumer.getConnectionID().toString() + ":" +
consumer.getSessionID() + ":" +
consumer.getID();
}
}

View File

@ -53,7 +53,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
protected final ActiveMQServer server;
protected final Predicate<ServerConsumer> federationConsumerMatcher;
protected final FederationReceiveFromQueuePolicy policy;
protected final Map<FederationConsumerInfo, FederationConsumerEntry> remoteConsumers = new HashMap<>();
protected final Map<String, FederationQueueEntry> remoteConsumers = new HashMap<>();
protected final FederationInternal federation;
private volatile boolean started;
@ -106,12 +106,20 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
}
@Override
public synchronized void beforeCloseConsumer(ServerConsumer consumer, boolean failed) {
public synchronized void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
if (started) {
final FederationConsumerInfo consumerInfo = createConsumerInfo(consumer);
final FederationConsumerEntry entry = remoteConsumers.get(consumerInfo);
final String queueName = consumer.getQueue().getName().toString();
final FederationQueueEntry entry = remoteConsumers.get(queueName);
if (entry != null && entry.reduceDemand()) {
if (entry == null) {
return;
}
entry.reduceDemand(consumer);
logger.trace("Reducing demand on federated queue {}, remaining demand? {}", queueName, entry.hasDemand());
if (!entry.hasDemand()) {
final FederationConsumerInternal federationConsuner = entry.getConsumer();
try {
@ -119,7 +127,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
federationConsuner.close();
signalAfterCloseFederationConsumer(federationConsuner);
} finally {
remoteConsumers.remove(consumerInfo);
remoteConsumers.remove(queueName);
}
}
}
@ -137,7 +145,8 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
queue.getConsumers()
.stream()
.filter(consumer -> consumer instanceof ServerConsumer)
.map(c -> (ServerConsumer) c).forEach(this::reactIfConsumerMatchesPolicy);
.map(c -> (ServerConsumer) c)
.forEach(this::reactIfConsumerMatchesPolicy);
}
protected final void reactIfConsumerMatchesPolicy(ServerConsumer consumer) {
@ -160,13 +169,16 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
// Check for existing consumer add demand from a additional local consumer
// to ensure the remote consumer remains active until all local demand is
// withdrawn.
if (remoteConsumers.containsKey(consumerInfo)) {
remoteConsumers.get(consumerInfo).addDemand();
if (remoteConsumers.containsKey(consumerInfo.getQueueName())) {
logger.trace("Federation Queue Policy manager found existing demand for queue: {}, adding demand", consumerInfo.getQueueName());
remoteConsumers.get(consumerInfo.getQueueName()).addDemand(consumer);
} else {
logger.trace("Federation Queue Policy manager creating remote consumer for queue: {}", consumerInfo.getQueueName());
signalBeforeCreateFederationConsumer(consumerInfo);
final FederationConsumerInternal queueConsumer = createFederationConsumer(consumerInfo);
final FederationConsumerEntry entry = createConsumerEntry(queueConsumer);
final FederationQueueEntry entry = createServerConsumerEntry(queueConsumer).addDemand(consumer);
// Handle remote close with remove of consumer which means that future demand will
// attempt to create a new consumer for that demand. Ensure that thread safety is
@ -174,7 +186,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
queueConsumer.setRemoteClosedHandler((closedConsumer) -> {
synchronized (this) {
try {
remoteConsumers.remove(closedConsumer.getConsumerInfo());
remoteConsumers.remove(closedConsumer.getConsumerInfo().getQueueName());
} finally {
closedConsumer.close();
}
@ -182,7 +194,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
});
// Called under lock so state should stay in sync
remoteConsumers.put(consumerInfo, entry);
remoteConsumers.put(consumerInfo.getQueueName(), entry);
// Now that we are tracking it we can start it
queueConsumer.start();
@ -224,17 +236,18 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
}
/**
* Creates a {@link FederationConsumerEntry} instance that will be used to store a {@link FederationConsumer}
* along with other state data needed to manage a federation consumer instance. A subclass can override
* this method to return a more customized entry type with additional state data.
* Creates a {@link FederationQueueEntry} instance that will be used to store an instance of
* a {@link FederationConsumer} along with other state data needed to manage a federation consumer
* instance. A subclass can override this method to return a more customized entry type with additional
* state data.
*
* @param consumer
* The {@link FederationConsumerInternal} instance that will be housed in this entry.
*
* @return a new {@link FederationConsumerEntry} that holds the given federation consumer.
* @return a new {@link FederationQueueEntry} that holds the given federation consumer.
*/
protected FederationConsumerEntry createConsumerEntry(FederationConsumerInternal consumer) {
return new FederationConsumerEntry(consumer);
protected FederationQueueEntry createServerConsumerEntry(FederationConsumerInternal consumer) {
return new FederationQueueEntry(consumer);
}
/**

View File

@ -54,6 +54,12 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@ -66,6 +72,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -77,9 +84,14 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFedera
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin;
import org.apache.activemq.artemis.protocol.amqp.federation.Federation;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
@ -180,14 +192,15 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
// Should be no frames generated as we already federated the address and the staticly added
// Should be no frames generated as we already federated the address and the statically added
// queue should retain demand when this consumer leaves.
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(session.createTopic("test"));
session.createConsumer(session.createTopic("test"));
session.createConsumer(session.createTopic("test"));
connection.start();
consumer.close();
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
@ -2265,6 +2278,420 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
}
}
@Test(timeout = 20000)
public void testFederationStartedTriggersRemoteDemandWithExistingAddressBindings() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Test started, peer listening on: {}", remoteURI);
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
receiveFromAddress.setName("address-policy");
receiveFromAddress.addToIncludes("test");
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName("sample-federation");
element.addLocalAddressPolicy(receiveFromAddress);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration("test-address-federation", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.setAutostart(false);
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.MULTICAST)
.setAddress("test")
.setAutoCreated(false));
Wait.assertTrue(() -> server.queueQuery(SimpleString.toSimpleString("test")).isExists());
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
// Create demand on the addresses so that on start federation should happen
final Connection connection = factory.createConnection();
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createTopic("test"));
session.createConsumer(session.createTopic("test"));
// Add other non-federation address bindings for the policy to check on start.
session.createConsumer(session.createTopic("a1"));
session.createConsumer(session.createTopic("a2"));
connection.start();
// Should be no interactions at this point, check to make sure.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.withName(allOf(containsString("sample-federation"),
containsString("test"),
containsString("address-receiver"),
containsString(server.getNodeID().toString())))
.respond()
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
// Starting the broker connection should trigger federation of address with demand.
server.getBrokerConnections().forEach(c -> {
try {
c.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// Add more demand while federation is starting
session.createConsumer(session.createTopic("test"));
session.createConsumer(session.createTopic("test"));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// This removes the connection demand, but leaves behind the static queue
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
// This should trigger the federation consumer to be shutdown as the statically defined queue
// should be the only remaining demand on the address.
logger.info("Removing Queues from federated address to eliminate demand");
server.destroyQueue(SimpleString.toSimpleString("test"));
Wait.assertFalse(() -> server.queueQuery(SimpleString.toSimpleString("test")).isExists());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
@Test(timeout = 20000)
public void testFederationStartedTriggersRemoteDemandWithExistingAddressAndDivertBindings() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Test started, peer listening on: {}", remoteURI);
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
receiveFromAddress.setName("address-policy");
receiveFromAddress.addToIncludes("test");
receiveFromAddress.setEnableDivertBindings(true);
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName("sample-federation");
element.addLocalAddressPolicy(receiveFromAddress);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration("test-address-federation", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.setAutostart(false);
amqpConnection.addElement(element);
final DivertConfiguration divert = new DivertConfiguration();
divert.setName("test-divert");
divert.setAddress("test");
divert.setExclusive(false);
divert.setForwardingAddress("target");
divert.setRoutingType(ComponentConfigurationRoutingType.MULTICAST);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
// Configure addresses and divert for the test
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("target"), RoutingType.MULTICAST));
server.deployDivert(divert);
// Create demand on the addresses so that on start federation should happen
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
final Connection connection = factory.createConnection();
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final Topic test = session.createTopic("test");
final Topic target = session.createTopic("target");
session.createConsumer(test);
session.createConsumer(test);
session.createConsumer(target);
session.createConsumer(target);
// Add other non-federation address bindings for the policy to check on start.
session.createConsumer(session.createTopic("a1"));
session.createConsumer(session.createTopic("a2"));
connection.start();
// Should be no interactions at this point, check to make sure.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.withName(allOf(containsString("sample-federation"),
containsString("test"),
containsString("address-receiver"),
containsString(server.getNodeID().toString())))
.respond()
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
// Starting the broker connection should trigger federation of address with demand.
server.getBrokerConnections().forEach(c -> {
try {
c.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// Add more demand while federation is starting
session.createConsumer(test);
session.createConsumer(target);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
// This removes the connection demand, but leaves behind the static queue
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
@Test(timeout = 20000)
public void testFederationStartTriggersFederationWithMultipleDivertsAndRemainsActiveAfterOneRemoved() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Test started, peer listening on: {}", remoteURI);
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
receiveFromAddress.setName("address-policy");
receiveFromAddress.addToIncludes("test");
receiveFromAddress.setEnableDivertBindings(true);
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName("sample-federation");
element.addLocalAddressPolicy(receiveFromAddress);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration("test-address-federation", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.setAutostart(false);
amqpConnection.addElement(element);
final DivertConfiguration divert1 = new DivertConfiguration();
divert1.setName("test-divert-1");
divert1.setAddress("test");
divert1.setExclusive(false);
divert1.setForwardingAddress("target1,target2");
divert1.setRoutingType(ComponentConfigurationRoutingType.MULTICAST);
final DivertConfiguration divert2 = new DivertConfiguration();
divert2.setName("test-divert-2");
divert2.setAddress("test");
divert2.setExclusive(false);
divert2.setForwardingAddress("target1,target3");
divert2.setRoutingType(ComponentConfigurationRoutingType.MULTICAST);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
// Configure addresses and divert for the test
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("target1"), RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("target2"), RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("target3"), RoutingType.MULTICAST));
server.deployDivert(divert1);
server.deployDivert(divert2);
// Create demand on the addresses so that on start federation should happen
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
final Connection connection = factory.createConnection();
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final Topic target1 = session.createTopic("target1");
final Topic target2 = session.createTopic("target2");
final Topic target3 = session.createTopic("target2");
session.createConsumer(target1);
session.createConsumer(target2);
session.createConsumer(target3);
connection.start();
// Should be no interactions at this point, check to make sure.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.withName(allOf(containsString("sample-federation"),
containsString("test"),
containsString("address-receiver"),
containsString(server.getNodeID().toString())))
.respond()
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
// Starting the broker connection should trigger federation of address with demand.
server.getBrokerConnections().forEach(c -> {
try {
c.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// Add more demand while federation is starting
session.createConsumer(target1);
session.createConsumer(target2);
session.createConsumer(target3);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
server.destroyDivert(SimpleString.toSimpleString(divert1.getName()));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
server.destroyDivert(SimpleString.toSimpleString(divert2.getName()));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
connection.close();
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
@Test(timeout = 20000)
public void testFederationPluginCanLimitDemandToOnlyTheConfiguredDivert() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Test started, peer listening on: {}", remoteURI);
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
receiveFromAddress.setName("address-policy");
receiveFromAddress.addToIncludes("test");
receiveFromAddress.setEnableDivertBindings(true);
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName("sample-federation");
element.addLocalAddressPolicy(receiveFromAddress);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration("test-address-federation", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(5);
amqpConnection.addElement(element);
final DivertConfiguration divert = new DivertConfiguration();
divert.setName("test-divert-1");
divert.setAddress("test");
divert.setExclusive(false);
divert.setForwardingAddress("target");
divert.setRoutingType(ComponentConfigurationRoutingType.MULTICAST);
final AMQPTestFederationBrokerPlugin federationPlugin = new AMQPTestFederationBrokerPlugin();
federationPlugin.shouldCreateConsumerForDivert = (d, q) -> true;
federationPlugin.shouldCreateConsumerForQueue = (q) -> {
// Disallow any binding on the source address from creating federation demand
// any other binding that matches the policy will create demand for federation.
if (q.getAddress().toString().equals("test")) {
return false;
}
return true;
};
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
// Configure addresses and divert for the test
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("target"), RoutingType.MULTICAST));
server.deployDivert(divert);
server.registerBrokerPlugin(federationPlugin);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
// Should be ignored as plugin rejects this binding as demand.
final MessageConsumer consumer = session.createConsumer(session.createTopic("test"));
connection.start();
// Get a round trip to the broker to allow time for federation to hopefully
// reject this first consumer on the source address.
consumer.receiveNoWait();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.withName(allOf(containsString("sample-federation"),
containsString("test"),
containsString("address-receiver"),
containsString(server.getNodeID().toString())))
.respond()
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
session.createConsumer(session.createTopic("target"));
// Now a federation receiver should get created for the divert demand.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
}
public static class ApplicationPropertiesTransformer implements Transformer {
private final Map<String, String> properties = new HashMap<>();
@ -2372,4 +2799,82 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
.withOfferedCapability(FEDERATION_CONTROL_LINK.toString());
peer.expectFlow();
}
private class AMQPTestFederationBrokerPlugin implements ActiveMQServerAMQPFederationPlugin {
public final AtomicBoolean started = new AtomicBoolean();
public final AtomicBoolean stopped = new AtomicBoolean();
public final AtomicReference<FederationConsumerInfo> beforeCreateConsumerCapture = new AtomicReference<>();
public final AtomicReference<FederationConsumer> afterCreateConsumerCapture = new AtomicReference<>();
public final AtomicReference<FederationConsumer> beforeCloseConsumerCapture = new AtomicReference<>();
public final AtomicReference<FederationConsumer> afterCloseConsumerCapture = new AtomicReference<>();
public Consumer<FederationConsumerInfo> beforeCreateConsumer = (c) -> beforeCreateConsumerCapture.set(c);;
public Consumer<FederationConsumer> afterCreateConsumer = (c) -> afterCreateConsumerCapture.set(c);
public Consumer<FederationConsumer> beforeCloseConsumer = (c) -> beforeCloseConsumerCapture.set(c);
public Consumer<FederationConsumer> afterCloseConsumer = (c) -> afterCloseConsumerCapture.set(c);
public BiConsumer<FederationConsumer, org.apache.activemq.artemis.api.core.Message> beforeMessageHandled = (c, m) -> { };
public BiConsumer<FederationConsumer, org.apache.activemq.artemis.api.core.Message> afterMessageHandled = (c, m) -> { };
public Function<AddressInfo, Boolean> shouldCreateConsumerForAddress = (a) -> true;
public Function<org.apache.activemq.artemis.core.server.Queue, Boolean> shouldCreateConsumerForQueue = (q) -> true;
public BiFunction<Divert, org.apache.activemq.artemis.core.server.Queue, Boolean> shouldCreateConsumerForDivert = (d, q) -> true;
@Override
public void federationStarted(final Federation federation) throws ActiveMQException {
started.set(true);
}
@Override
public void federationStopped(final Federation federation) throws ActiveMQException {
stopped.set(true);
}
@Override
public void beforeCreateFederationConsumer(final FederationConsumerInfo consumerInfo) throws ActiveMQException {
beforeCreateConsumer.accept(consumerInfo);
}
@Override
public void afterCreateFederationConsumer(final FederationConsumer consumer) throws ActiveMQException {
afterCreateConsumer.accept(consumer);
}
@Override
public void beforeCloseFederationConsumer(final FederationConsumer consumer) throws ActiveMQException {
beforeCloseConsumer.accept(consumer);
}
@Override
public void afterCloseFederationConsumer(final FederationConsumer consumer) throws ActiveMQException {
afterCloseConsumer.accept(consumer);
}
@Override
public void beforeFederationConsumerMessageHandled(final FederationConsumer consumer, org.apache.activemq.artemis.api.core.Message message) throws ActiveMQException {
beforeMessageHandled.accept(consumer, message);
}
@Override
public void afterFederationConsumerMessageHandled(final FederationConsumer consumer, org.apache.activemq.artemis.api.core.Message message) throws ActiveMQException {
afterMessageHandled.accept(consumer, message);
}
@Override
public boolean shouldCreateFederationConsumerForAddress(final AddressInfo address) throws ActiveMQException {
return shouldCreateConsumerForAddress.apply(address);
}
@Override
public boolean shouldCreateFederationConsumerForQueue(final org.apache.activemq.artemis.core.server.Queue queue) throws ActiveMQException {
return shouldCreateConsumerForQueue.apply(queue);
}
@Override
public boolean shouldCreateFederationConsumerForDivert(Divert divert, org.apache.activemq.artemis.core.server.Queue queue) throws ActiveMQException {
return shouldCreateConsumerForDivert.apply(divert, queue);
}
}
}

View File

@ -2600,6 +2600,93 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
}
}
@Test(timeout = 20000)
public void testFederationCreatesQueueReceiverLinkForQueueAfterBrokerConnectionStarted() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Connect test started, peer listening on: {}", remoteURI);
final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement();
receiveFromQueue.setName("queue-policy");
receiveFromQueue.addToIncludes("test", "test");
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName("sample-federation");
element.addLocalQueuePolicy(receiveFromQueue);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.setAutostart(false);
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
.setAddress("test")
.setAutoCreated(false));
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
final Connection connection = factory.createConnection();
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue("test");
session.createConsumer(queue);
session.createConsumer(queue);
session.createConsumer(queue);
connection.start();
// Should be no interactions yet, check to be sure
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
server.getBrokerConnections().forEach(c -> {
try {
c.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// Add another demand consumer immediately after start to add more events
// on the federation plugin for broker events.
session.createConsumer(queue);
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
.withName(allOf(containsString("sample-federation"),
containsString("test::test"),
containsString("queue-receiver"),
containsString(server.getNodeID().toString())))
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
.respond()
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
// Add more demand after the broker connection starts
session.createConsumer(queue);
session.createConsumer(queue);
// This should remove all demand on the queue and federation should stop
connection.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
public static class ApplicationPropertiesTransformer implements Transformer {
private final Map<String, String> properties = new HashMap<>();