ARTEMIS-4642 Fix tracked federated demand when links are rejected or blocked

Under some scenarios federation demand tracking is losing track of total demand
for a federated resource leading to teardown of federated links before all local
demand has been removed from the resource. This occurs most often if the attempts
to establish a federation link are refused because the resource hasn't yet been
created and an eventual attach succeeds, but can also occur in combination with
a plugin blocking or not blocking federation link creation in some cases.
This commit is contained in:
Timothy Bish 2024-02-15 12:02:06 -05:00 committed by Robbie Gemmell
parent 42a2e4637f
commit b8800337df
10 changed files with 772 additions and 140 deletions

View File

@ -165,7 +165,7 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
if (started) { if (started) {
started = false; started = false;
connection.runLater(() -> { connection.runLater(() -> {
federation.removeLinkClosedInterceptor(consumerInfo.getFqqn()); federation.removeLinkClosedInterceptor(consumerInfo.getId());
if (receiver != null) { if (receiver != null) {
try { try {
@ -350,7 +350,7 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
// Intercept remote close and check for valid reasons for remote closure such as // Intercept remote close and check for valid reasons for remote closure such as
// the remote peer not having a matching queue for this subscription or from an // the remote peer not having a matching queue for this subscription or from an
// operator manually closing the link. // operator manually closing the link.
federation.addLinkClosedInterceptor(consumerInfo.getFqqn(), remoteCloseInterceptor); federation.addLinkClosedInterceptor(consumerInfo.getId(), remoteCloseInterceptor);
receiver = new AMQPFederatedAddressDeliveryReceiver(session, consumerInfo, protonReceiver); receiver = new AMQPFederatedAddressDeliveryReceiver(session, consumerInfo, protonReceiver);

View File

@ -162,7 +162,7 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
if (started) { if (started) {
started = false; started = false;
connection.runLater(() -> { connection.runLater(() -> {
federation.removeLinkClosedInterceptor(consumerInfo.getFqqn()); federation.removeLinkClosedInterceptor(consumerInfo.getId());
if (receiver != null) { if (receiver != null) {
try { try {
@ -341,7 +341,7 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
// Intercept remote close and check for valid reasons for remote closure such as // Intercept remote close and check for valid reasons for remote closure such as
// the remote peer not having a matching queue for this subscription or from an // the remote peer not having a matching queue for this subscription or from an
// operator manually closing the link. // operator manually closing the link.
federation.addLinkClosedInterceptor(consumerInfo.getFqqn(), remoteCloseIntercepter); federation.addLinkClosedInterceptor(consumerInfo.getId(), remoteCloseIntercepter);
receiver = new AMQPFederatedQueueDeliveryReceiver(localQueue, protonReceiver); receiver = new AMQPFederatedQueueDeliveryReceiver(localQueue, protonReceiver);

View File

@ -38,6 +38,11 @@ public interface FederationConsumerInfo {
QUEUE_CONSUMER QUEUE_CONSUMER
} }
/**
* @return a unique Id for the consumer being represented.
*/
String getId();
/** /**
* @return the type of federation consumer being represented. * @return the type of federation consumer being represented.
*/ */

View File

@ -18,42 +18,59 @@
package org.apache.activemq.artemis.protocol.amqp.federation.internal; package org.apache.activemq.artemis.protocol.amqp.federation.internal;
import java.util.HashSet; import java.util.HashSet;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
/** /**
* An entry type class used to hold a {@link FederationConsumerInternal} and * An entry type class used to hold a {@link FederationConsumerInternal} and
* any other state data needed by the manager that is creating them based on the * any other state data needed by the manager that is creating them based on the
* policy configuration for the federation instance. The entry can be extended * policy configuration for the federation instance. The entry can be extended
* by federation implementation to hold additional state data for the federation * by federation implementation to hold additional state data for the federation
* consumer and the managing of its lifetime. * consumer and the managing active demand.
* *
* This entry type provides a reference counter that can be used to register demand * This entry type provides reference tracking state for current demand (bindings)
* on a federation resource such that it is not torn down until all demand has been * on a federation resource such that it is not torn down until all demand has been
* removed from the local resource. * removed from the local resource.
*/ */
public class FederationAddressEntry { public class FederationAddressEntry {
private final FederationConsumerInternal consumer; private final AddressInfo addressInfo;
private final Set<Binding> demandBindings = new HashSet<>(); private final Set<Binding> demandBindings = new HashSet<>();
private FederationConsumerInternal consumer;
/** /**
* Creates a new address entry with a single reference * Creates a new address entry for tracking demand on a federated address
* *
* @param consumer * @param addressInfo
* The federation consumer that will be carried in this entry. * The address information object that this entry hold demand state for.
*/ */
public FederationAddressEntry(FederationConsumerInternal consumer) { public FederationAddressEntry(AddressInfo addressInfo) {
this.consumer = consumer; this.addressInfo = addressInfo;
}
/**
* @return the address information that this entry is acting to federate.
*/
public AddressInfo getAddressInfo() {
return addressInfo;
} }
/** /**
* @return the address that this entry is acting to federate. * @return the address that this entry is acting to federate.
*/ */
public String getAddress() { public String getAddress() {
return consumer.getConsumerInfo().getAddress(); return addressInfo.getName().toString();
}
/**
* @return <code>true</code> if a consumer is currently set on this entry.
*/
public boolean hasConsumer() {
return consumer != null;
} }
/** /**
@ -63,6 +80,30 @@ public class FederationAddressEntry {
return consumer; return consumer;
} }
/**
* Sets the consumer assigned to this entry to the given instance.
*
* @param consumer
* The federation consumer that is currently active for this entry.
*
* @return this federation address consumer entry.
*/
public FederationAddressEntry setConsumer(FederationConsumerInternal consumer) {
Objects.requireNonNull(consumer, "Cannot assign a null consumer to this entry, call clear to unset");
this.consumer = consumer;
return this;
}
/**
* Clears the currently assigned consumer from this entry.
*
* @return this federation address consumer entry.
*/
public FederationAddressEntry clearConsumer() {
this.consumer = null;
return this;
}
/** /**
* @return true if there are bindings that are mapped to this federation entry. * @return true if there are bindings that are mapped to this federation entry.
*/ */
@ -85,7 +126,7 @@ public class FederationAddressEntry {
* *
* @return this federation address consumer entry. * @return this federation address consumer entry.
*/ */
public FederationAddressEntry removeDenamd(Binding binding) { public FederationAddressEntry removeDemand(Binding binding) {
demandBindings.remove(binding); demandBindings.remove(binding);
return this; return this;
} }

View File

@ -64,8 +64,8 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
protected final ActiveMQServer server; protected final ActiveMQServer server;
protected final FederationInternal federation; protected final FederationInternal federation;
protected final FederationReceiveFromAddressPolicy policy; protected final FederationReceiveFromAddressPolicy policy;
protected final Map<String, FederationAddressEntry> remoteConsumers = new HashMap<>(); protected final Map<String, FederationAddressEntry> demandTracking = new HashMap<>();
protected final Map<DivertBinding, Set<QueueBinding>> matchingDiverts = new HashMap<>(); protected final Map<DivertBinding, Set<QueueBinding>> divertsTracking = new HashMap<>();
private volatile boolean started; private volatile boolean started;
@ -101,9 +101,24 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
if (started) { if (started) {
started = false; started = false;
server.unRegisterBrokerPlugin(this); server.unRegisterBrokerPlugin(this);
remoteConsumers.forEach((k, v) -> v.getConsumer().close()); // Cleanup and recreate if ever reconnected. demandTracking.forEach((k, v) -> {
remoteConsumers.clear(); if (v.hasConsumer()) {
matchingDiverts.clear(); v.getConsumer().close();
}
});
demandTracking.clear();
divertsTracking.clear();
}
}
@Override
public synchronized void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException {
if (started) {
final FederationAddressEntry entry = demandTracking.remove(address.toString());
if (entry != null && entry.hasConsumer()) {
entry.getConsumer().close();
}
} }
} }
@ -111,7 +126,7 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException { public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException {
if (started) { if (started) {
if (binding instanceof QueueBinding) { if (binding instanceof QueueBinding) {
final FederationAddressEntry entry = remoteConsumers.get(binding.getAddress().toString()); final FederationAddressEntry entry = demandTracking.get(binding.getAddress().toString());
if (entry != null) { if (entry != null) {
// This is QueueBinding that was mapped to a federated address so we can directly remove // This is QueueBinding that was mapped to a federated address so we can directly remove
@ -122,7 +137,7 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
// is bound and remove the mapping for any matches, diverts can have a composite set of address // is bound and remove the mapping for any matches, diverts can have a composite set of address
// forwards so each divert must be checked in turn to see if it contains the address the removed // forwards so each divert must be checked in turn to see if it contains the address the removed
// binding was bound to. // binding was bound to.
matchingDiverts.entrySet().forEach(divertEntry -> { divertsTracking.entrySet().forEach(divertEntry -> {
final String sourceAddress = divertEntry.getKey().getAddress().toString(); final String sourceAddress = divertEntry.getKey().getAddress().toString();
final SimpleString forwardAddress = divertEntry.getKey().getDivert().getForwardAddress(); final SimpleString forwardAddress = divertEntry.getKey().getDivert().getForwardAddress();
@ -134,7 +149,7 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
divertEntry.getValue().remove(binding); divertEntry.getValue().remove(binding);
if (divertEntry.getValue().isEmpty()) { if (divertEntry.getValue().isEmpty()) {
tryRemoveDemandOnAddress(remoteConsumers.get(sourceAddress), divertEntry.getKey()); tryRemoveDemandOnAddress(demandTracking.get(sourceAddress), divertEntry.getKey());
} }
} }
}); });
@ -142,14 +157,14 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
} else if (policy.isEnableDivertBindings() && binding instanceof DivertBinding) { } else if (policy.isEnableDivertBindings() && binding instanceof DivertBinding) {
final DivertBinding divert = (DivertBinding) binding; final DivertBinding divert = (DivertBinding) binding;
if (matchingDiverts.remove(divert) != null) { if (divertsTracking.remove(divert) != null) {
// The divert binding is treated as one unit of demand on a federated address and // The divert binding is treated as one unit of demand on a federated address and
// when the divert is removed that unit of demand is removed regardless of existing // when the divert is removed that unit of demand is removed regardless of existing
// bindings still remaining on the divert forwards. If the divert demand was the // bindings still remaining on the divert forwards. If the divert demand was the
// only thing keeping the federated address consumer open this will result in it // only thing keeping the federated address consumer open this will result in it
// being closed. // being closed.
try { try {
tryRemoveDemandOnAddress(remoteConsumers.get(divert.getAddress().toString()), divert); tryRemoveDemandOnAddress(demandTracking.get(divert.getAddress().toString()), divert);
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationBindingsLookupError(divert.getDivert().getForwardAddress(), e); ActiveMQServerLogger.LOGGER.federationBindingsLookupError(divert.getDivert().getForwardAddress(), e);
} }
@ -160,11 +175,11 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
protected final void tryRemoveDemandOnAddress(FederationAddressEntry entry, Binding binding) { protected final void tryRemoveDemandOnAddress(FederationAddressEntry entry, Binding binding) {
if (entry != null) { if (entry != null) {
entry.removeDenamd(binding); entry.removeDemand(binding);
logger.trace("Reducing demand on federated address {}, remaining demand? {}", entry.getAddress(), entry.hasDemand()); logger.trace("Reducing demand on federated address {}, remaining demand? {}", entry.getAddress(), entry.hasDemand());
if (!entry.hasDemand()) { if (!entry.hasDemand() && entry.hasConsumer()) {
final FederationConsumerInternal federationConsuner = entry.getConsumer(); final FederationConsumerInternal federationConsuner = entry.getConsumer();
try { try {
@ -172,7 +187,7 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
federationConsuner.close(); federationConsuner.close();
signalAfterCloseFederationConsumer(federationConsuner); signalAfterCloseFederationConsumer(federationConsuner);
} finally { } finally {
remoteConsumers.remove(entry.getAddress()); demandTracking.remove(entry.getAddress());
} }
} }
} }
@ -200,7 +215,8 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
// match the divert. // match the divert.
server.getPostOffice() server.getPostOffice()
.getDirectBindings(addressInfo.getName()) .getDirectBindings(addressInfo.getName())
.stream().filter(binding -> binding instanceof DivertBinding) .stream()
.filter(binding -> binding instanceof DivertBinding)
.forEach(this::checkBindingForMatch); .forEach(this::checkBindingForMatch);
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationBindingsLookupError(addressInfo.getName(), e); ActiveMQServerLogger.LOGGER.federationBindingsLookupError(addressInfo.getName(), e);
@ -260,9 +276,9 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
// We only need to check if we've never seen the divert before, afterwards we will // We only need to check if we've never seen the divert before, afterwards we will
// be checking it any time a new QueueBinding is added instead. // be checking it any time a new QueueBinding is added instead.
if (matchingDiverts.get(divertBinding) == null) { if (divertsTracking.get(divertBinding) == null) {
final Set<QueueBinding> matchingQueues = new HashSet<>(); final Set<QueueBinding> matchingQueues = new HashSet<>();
matchingDiverts.put(divertBinding, matchingQueues); divertsTracking.put(divertBinding, matchingQueues);
// We must account for the composite divert case by splitting the address and // We must account for the composite divert case by splitting the address and
// getting the bindings on each one. // getting the bindings on each one.
@ -306,7 +322,7 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
final SimpleString queueAddress = queueBinding.getAddress(); final SimpleString queueAddress = queueBinding.getAddress();
matchingDiverts.entrySet().forEach((e) -> { divertsTracking.entrySet().forEach((e) -> {
final SimpleString forwardAddress = e.getKey().getDivert().getForwardAddress(); final SimpleString forwardAddress = e.getKey().getDivert().getForwardAddress();
final DivertBinding divertBinding = e.getKey(); final DivertBinding divertBinding = e.getKey();
@ -350,44 +366,62 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
return false; return false;
} }
protected final void createOrUpdateFederatedAddressConsumerForBinding(AddressInfo address, Binding binding) { protected final void createOrUpdateFederatedAddressConsumerForBinding(AddressInfo addressInfo, Binding binding) {
logger.trace("Federation Address Policy matched on for demand on address: {} : binding: {}", address, binding); logger.trace("Federation Address Policy matched on for demand on address: {} : binding: {}", addressInfo, binding);
final String addressName = addressInfo.getName().toString();
final FederationAddressEntry entry;
// Check for existing consumer add demand from a additional local consumer to ensure // Check for existing consumer add demand from a additional local consumer to ensure
// the remote consumer remains active until all local demand is withdrawn. The federation // the remote consumer remains active until all local demand is withdrawn.
// plugin can block creation of the federation consumer at this stage. if (demandTracking.containsKey(addressName)) {
if (remoteConsumers.containsKey(address.getName().toString())) { entry = demandTracking.get(addressName);
logger.trace("Federation Address Policy manager found existing demand for address: {}, adding demand", address); } else {
remoteConsumers.get(address.getName().toString()).addDemand(binding); entry = new FederationAddressEntry(addressInfo);
} else if (!isPluginBlockingFederationConsumerCreate(address)) { demandTracking.put(addressName, entry);
logger.trace("Federation Address Policy manager creating remote consumer for address: {}", address); }
final FederationConsumerInfo consumerInfo = createConsumerInfo(address); // Demand passed all binding plugin blocking checks so we track it, plugin can still
final FederationConsumerInternal queueConsumer = createFederationConsumer(consumerInfo); // stop federation of the address based on some external criteria but once it does
final FederationAddressEntry entry = createConsumerEntry(queueConsumer); // (if ever) allow it we will have tracked all allowed demand.
entry.addDemand(binding);
tryCreateFederationConsumerForAddress(entry);
}
private void tryCreateFederationConsumerForAddress(FederationAddressEntry addressEntry) {
final AddressInfo addressInfo = addressEntry.getAddressInfo();
if (addressEntry.hasDemand() && !addressEntry.hasConsumer() && !isPluginBlockingFederationConsumerCreate(addressInfo)) {
logger.trace("Federation Address Policy manager creating remote consumer for address: {}", addressInfo);
final FederationConsumerInfo consumerInfo = createConsumerInfo(addressInfo);
final FederationConsumerInternal addressConsumer = createFederationConsumer(consumerInfo);
signalBeforeCreateFederationConsumer(consumerInfo); signalBeforeCreateFederationConsumer(consumerInfo);
// Handle remote close with remove of consumer which means that future demand will // Handle remote close with remove of consumer which means that future demand will
// attempt to create a new consumer for that demand. Ensure that thread safety is // attempt to create a new consumer for that demand. Ensure that thread safety is
// accounted for here as the notification can be asynchronous. // accounted for here as the notification can be asynchronous.
queueConsumer.setRemoteClosedHandler((closedConsumer) -> { addressConsumer.setRemoteClosedHandler((closedConsumer) -> {
synchronized (this) { synchronized (this) {
try { try {
remoteConsumers.remove(closedConsumer.getConsumerInfo().getAddress()); final FederationAddressEntry tracked = demandTracking.get(closedConsumer.getConsumerInfo().getAddress());
if (tracked != null) {
tracked.clearConsumer();
}
} finally { } finally {
closedConsumer.close(); closedConsumer.close();
} }
} }
}); });
// Called under lock so state should stay in sync addressEntry.setConsumer(addressConsumer);
remoteConsumers.put(entry.getAddress(), entry.addDemand(binding));
// Now that we are tracking it we can start it addressConsumer.start();
queueConsumer.start();
signalAfterCreateFederationConsumer(queueConsumer); signalAfterCreateFederationConsumer(addressConsumer);
} }
} }
@ -404,21 +438,12 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
public synchronized void afterRemoteAddressAdded(String addressName) throws Exception { public synchronized void afterRemoteAddressAdded(String addressName) throws Exception {
// Assume that the remote address that matched a previous federation attempt is MULTICAST // Assume that the remote address that matched a previous federation attempt is MULTICAST
// so that we retry if current local state matches the policy and if it isn't we will once // so that we retry if current local state matches the policy and if it isn't we will once
// again record the federation attempt with the remote and but updated if the remote removes // again record the federation attempt with the remote and be updated if the remote removes
// and adds the address again (hopefully with the correct routing type). // and adds the address again (hopefully with the correct routing type). We retrain all the
// current demand and don't need to re-check the server state before trying to create the
if (started && testIfAddressMatchesPolicy(addressName, RoutingType.MULTICAST) && !remoteConsumers.containsKey(addressName)) { // remote address consumer.
final SimpleString address = SimpleString.toSimpleString(addressName); if (started && testIfAddressMatchesPolicy(addressName, RoutingType.MULTICAST) && demandTracking.containsKey(addressName)) {
tryCreateFederationConsumerForAddress(demandTracking.get(addressName));
// Need to trigger check for all bindings that match to accumulate demand on the address
// if any and ensure an outgoing consumer is attempted.
server.getPostOffice()
.getDirectBindings(address)
.stream()
.filter(binding -> binding instanceof QueueBinding ||
(policy.isEnableDivertBindings() && binding instanceof DivertBinding))
.forEach(this::checkBindingForMatch);
} }
} }
@ -470,13 +495,13 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
* instance lifetime. A subclass can override this method to return a more customized entry type with * instance lifetime. A subclass can override this method to return a more customized entry type with
* additional state data. * additional state data.
* *
* @param consumer * @param addressInfo
* The {@link FederationConsumerInternal} instance that will be housed in this entry. * The address information that the created entry is meant to track demand for.
* *
* @return a new {@link FederationAddressEntry} that holds the given federation consumer. * @return a new {@link FederationAddressEntry} that tracks demand on an address.
*/ */
protected FederationAddressEntry createConsumerEntry(FederationConsumerInternal consumer) { protected FederationAddressEntry createConsumerEntry(AddressInfo addressInfo) {
return new FederationAddressEntry(consumer); return new FederationAddressEntry(addressInfo);
} }
/** /**

View File

@ -18,6 +18,7 @@
package org.apache.activemq.artemis.protocol.amqp.federation.internal; package org.apache.activemq.artemis.protocol.amqp.federation.internal;
import java.util.Objects; import java.util.Objects;
import java.util.UUID;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
@ -48,6 +49,7 @@ public class FederationGenericConsumerInfo implements FederationConsumerInfo {
private final String filterString; private final String filterString;
private final String fqqn; private final String fqqn;
private final int priority; private final int priority;
private final String id;
protected FederationGenericConsumerInfo(Role role, String address, String queueName, RoutingType routingType, protected FederationGenericConsumerInfo(Role role, String address, String queueName, RoutingType routingType,
String filterString, String fqqn, int priority) { String filterString, String fqqn, int priority) {
@ -58,6 +60,7 @@ public class FederationGenericConsumerInfo implements FederationConsumerInfo {
this.filterString = filterString; this.filterString = filterString;
this.fqqn = fqqn; this.fqqn = fqqn;
this.priority = priority; this.priority = priority;
this.id = UUID.randomUUID().toString();
} }
/** /**
@ -116,6 +119,11 @@ public class FederationGenericConsumerInfo implements FederationConsumerInfo {
ActiveMQDefaultConfiguration.getDefaultConsumerPriority()); ActiveMQDefaultConfiguration.getDefaultConsumerPriority());
} }
@Override
public String getId() {
return id;
}
@Override @Override
public Role getRole() { public Role getRole() {
return role; return role;

View File

@ -18,9 +18,11 @@
package org.apache.activemq.artemis.protocol.amqp.federation.internal; package org.apache.activemq.artemis.protocol.amqp.federation.internal;
import java.util.HashSet; import java.util.HashSet;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
/** /**
* An entry type class used to hold a {@link FederationConsumerInternal} and * An entry type class used to hold a {@link FederationConsumerInternal} and
@ -29,24 +31,46 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
* by federation implementation to hold additional state data for the federation * by federation implementation to hold additional state data for the federation
* consumer and the managing of its lifetime. * consumer and the managing of its lifetime.
* *
* This entry type provides a reference counter that can be used to register demand * This entry type provides reference tracking state for current demand (bindings)
* on a federation resource such that it is not torn down until all demand has been * on a federation resource such that it is not torn down until all demand has been
* removed from the local resource. * removed from the local resource.
*/ */
public class FederationQueueEntry { public class FederationQueueEntry {
private final FederationConsumerInternal consumer; private final FederationConsumerInfo consumerInfo;
private final Set<String> consumerDemand = new HashSet<>(); private final Set<String> consumerDemand = new HashSet<>();
private FederationConsumerInternal consumer;
/** /**
* Creates a new queue entry with a single reference * Creates a new queue entry with a single reference
* *
* @param consumer * @param consumerInfo
* The federation consumer that will be carried in this entry. * Consumer information object used to define the federation queue consumer
*/ */
public FederationQueueEntry(FederationConsumerInternal consumer) { public FederationQueueEntry(FederationConsumerInfo consumerInfo) {
this.consumer = consumer; this.consumerInfo = consumerInfo;
}
/**
* @return the name of the queue that this entry tracks demand for.
*/
public String getQueueName() {
return consumerInfo.getQueueName();
}
/**
* @return the consumer information that defines the properties of federation queue consumers
*/
public FederationConsumerInfo getConsumerInfo() {
return consumerInfo;
}
/**
* @return <code>true</code> if a consumer is currently set on this entry.
*/
public boolean hasConsumer() {
return consumer != null;
} }
/** /**
@ -56,6 +80,30 @@ public class FederationQueueEntry {
return consumer; return consumer;
} }
/**
* Sets the consumer assigned to this entry to the given instance.
*
* @param consumer
* The federation consumer that is currently active for this entry.
*
* @return this federation queue consumer entry.
*/
public FederationQueueEntry setConsumer(FederationConsumerInternal consumer) {
Objects.requireNonNull(consumer, "Cannot assign a null consumer to this entry, call clear to unset");
this.consumer = consumer;
return this;
}
/**
* Clears the currently assigned consumer from this entry.
*
* @return this federation queue consumer entry.
*/
public FederationQueueEntry clearConsumer() {
this.consumer = null;
return this;
}
/** /**
* @return true if there are bindings that are mapped to this federation entry. * @return true if there are bindings that are mapped to this federation entry.
*/ */

View File

@ -28,13 +28,16 @@ import java.util.function.Predicate;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.federation.Federation; import org.apache.activemq.artemis.core.server.federation.Federation;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy; import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
@ -46,14 +49,14 @@ import org.slf4j.LoggerFactory;
* monitoring broker queues for demand and creating a consumer for on the remote side * monitoring broker queues for demand and creating a consumer for on the remote side
* to federate messages back to this peer. * to federate messages back to this peer.
*/ */
public abstract class FederationQueuePolicyManager implements ActiveMQServerConsumerPlugin { public abstract class FederationQueuePolicyManager implements ActiveMQServerConsumerPlugin, ActiveMQServerBindingPlugin {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final ActiveMQServer server; protected final ActiveMQServer server;
protected final Predicate<ServerConsumer> federationConsumerMatcher; protected final Predicate<ServerConsumer> federationConsumerMatcher;
protected final FederationReceiveFromQueuePolicy policy; protected final FederationReceiveFromQueuePolicy policy;
protected final Map<String, FederationQueueEntry> remoteConsumers = new HashMap<>(); protected final Map<String, FederationQueueEntry> demandTracking = new HashMap<>();
protected final FederationInternal federation; protected final FederationInternal federation;
private volatile boolean started; private volatile boolean started;
@ -93,8 +96,12 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
// broker plugin instances. // broker plugin instances.
server.unRegisterBrokerPlugin(this); server.unRegisterBrokerPlugin(this);
started = false; started = false;
remoteConsumers.forEach((k, v) -> v.getConsumer().close()); // Cleanup and recreate if ever reconnected. demandTracking.forEach((k, v) -> {
remoteConsumers.clear(); if (v.hasConsumer()) {
v.getConsumer().close();
}
});
demandTracking.clear();
} }
} }
@ -109,7 +116,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
public synchronized void afterCloseConsumer(ServerConsumer consumer, boolean failed) { public synchronized void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
if (started) { if (started) {
final String queueName = consumer.getQueue().getName().toString(); final String queueName = consumer.getQueue().getName().toString();
final FederationQueueEntry entry = remoteConsumers.get(queueName); final FederationQueueEntry entry = demandTracking.get(queueName);
if (entry == null) { if (entry == null) {
return; return;
@ -119,7 +126,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
logger.trace("Reducing demand on federated queue {}, remaining demand? {}", queueName, entry.hasDemand()); logger.trace("Reducing demand on federated queue {}, remaining demand? {}", queueName, entry.hasDemand());
if (!entry.hasDemand()) { if (!entry.hasDemand() && entry.hasConsumer()) {
final FederationConsumerInternal federationConsuner = entry.getConsumer(); final FederationConsumerInternal federationConsuner = entry.getConsumer();
try { try {
@ -127,12 +134,24 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
federationConsuner.close(); federationConsuner.close();
signalAfterCloseFederationConsumer(federationConsuner); signalAfterCloseFederationConsumer(federationConsuner);
} finally { } finally {
remoteConsumers.remove(queueName); demandTracking.remove(queueName);
} }
} }
} }
} }
@Override
public synchronized void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException {
if (binding instanceof QueueBinding) {
final QueueBinding queueBinding = (QueueBinding) binding;
final FederationQueueEntry entry = demandTracking.remove(queueBinding.getQueue().getName().toString());
if (entry != null && entry.hasConsumer()) {
entry.getConsumer().close();
}
}
}
protected final void scanAllQueueBindings() { protected final void scanAllQueueBindings() {
server.getPostOffice() server.getPostOffice()
.getAllBindings() .getAllBindings()
@ -150,7 +169,9 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
} }
protected final void reactIfConsumerMatchesPolicy(ServerConsumer consumer) { protected final void reactIfConsumerMatchesPolicy(ServerConsumer consumer) {
if (testIfQueueMatchesPolicy(consumer.getQueueAddress().toString(), consumer.getQueueName().toString())) { final String queueName = consumer.getQueue().getName().toString();
if (testIfQueueMatchesPolicy(consumer.getQueueAddress().toString(), queueName)) {
// We should ignore federation consumers from remote peers but configuration does allow // We should ignore federation consumers from remote peers but configuration does allow
// these to be federated again for some very specific use cases so we check before then // these to be federated again for some very specific use cases so we check before then
// moving onto any server plugin checks kick in. // moving onto any server plugin checks kick in.
@ -158,49 +179,59 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
return; return;
} }
if (isPluginBlockingFederationConsumerCreate(consumer.getQueue())) {
return;
}
logger.trace("Federation Policy matched on consumer for binding: {}", consumer.getBinding()); logger.trace("Federation Policy matched on consumer for binding: {}", consumer.getBinding());
final FederationConsumerInfo consumerInfo = createConsumerInfo(consumer); final FederationQueueEntry entry;
// Check for existing consumer add demand from a additional local consumer // Check for existing consumer add demand from a additional local consumer to ensure
// to ensure the remote consumer remains active until all local demand is // the remote consumer remains active until all local demand is withdrawn.
// withdrawn. if (demandTracking.containsKey(queueName)) {
if (remoteConsumers.containsKey(consumerInfo.getQueueName())) { logger.trace("Federation Queue Policy manager found existing demand for queue: {}, adding demand", queueName);
logger.trace("Federation Queue Policy manager found existing demand for queue: {}, adding demand", consumerInfo.getQueueName()); entry = demandTracking.get(queueName);
remoteConsumers.get(consumerInfo.getQueueName()).addDemand(consumer);
} else { } else {
logger.trace("Federation Queue Policy manager creating remote consumer for queue: {}", consumerInfo.getQueueName()); demandTracking.put(queueName, entry = createConsumerEntry(createConsumerInfo(consumer)));
signalBeforeCreateFederationConsumer(consumerInfo);
final FederationConsumerInternal queueConsumer = createFederationConsumer(consumerInfo);
final FederationQueueEntry entry = createServerConsumerEntry(queueConsumer).addDemand(consumer);
// Handle remote close with remove of consumer which means that future demand will
// attempt to create a new consumer for that demand. Ensure that thread safety is
// accounted for here as the notification can be asynchronous.
queueConsumer.setRemoteClosedHandler((closedConsumer) -> {
synchronized (this) {
try {
remoteConsumers.remove(closedConsumer.getConsumerInfo().getQueueName());
} finally {
closedConsumer.close();
}
}
});
// Called under lock so state should stay in sync
remoteConsumers.put(consumerInfo.getQueueName(), entry);
// Now that we are tracking it we can start it
queueConsumer.start();
signalAfterCreateFederationConsumer(queueConsumer);
} }
// Demand passed all binding plugin blocking checks so we track it, plugin can still
// stop federation of the queue based on some external criteria but once it does
// (if ever) allow it we will have tracked all allowed demand.
entry.addDemand(consumer);
tryCreateFederationConsumerForQueue(entry, consumer.getQueue());
}
}
private void tryCreateFederationConsumerForQueue(FederationQueueEntry queueEntry, Queue queue) {
if (queueEntry.hasDemand() && !queueEntry.hasConsumer() && !isPluginBlockingFederationConsumerCreate(queue)) {
logger.trace("Federation Queue Policy manager creating remote consumer for queue: {}", queueEntry.getQueueName());
signalBeforeCreateFederationConsumer(queueEntry.getConsumerInfo());
final FederationConsumerInternal queueConsumer = createFederationConsumer(queueEntry.getConsumerInfo());
// Handle remote close with remove of consumer which means that future demand will
// attempt to create a new consumer for that demand. Ensure that thread safety is
// accounted for here as the notification can be asynchronous.
queueConsumer.setRemoteClosedHandler((closedConsumer) -> {
synchronized (this) {
try {
final FederationQueueEntry tracked = demandTracking.get(closedConsumer.getConsumerInfo().getQueueName());
if (tracked != null) {
tracked.clearConsumer();
}
} finally {
closedConsumer.close();
}
}
});
queueEntry.setConsumer(queueConsumer);
// Now that we are tracking it we can start it
queueConsumer.start();
signalAfterCreateFederationConsumer(queueConsumer);
} }
} }
@ -220,17 +251,15 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
// We ignore the remote address as locally the policy can be a wild card match and we can // We ignore the remote address as locally the policy can be a wild card match and we can
// try to federate based on the Queue only, if the remote rejects the federation consumer // try to federate based on the Queue only, if the remote rejects the federation consumer
// binding again the request will once more be recorded and we will get another event if // binding again the request will once more be recorded and we will get another event if
// the queue were recreated such that a match could be made. // the queue were recreated such that a match could be made. We retain all the current
if (started && testIfQueueMatchesPolicy(queueName)) { // demand and don't need to re-check the server state before trying to create the
// remote queue consumer.
if (started && testIfQueueMatchesPolicy(queueName) && demandTracking.containsKey(queueName)) {
final Queue queue = server.locateQueue(queueName);
// Find a matching Queue with the given name and then check for demand based if (queue != null) {
// on the attached consumers and the current policy constraints. tryCreateFederationConsumerForQueue(demandTracking.get(queueName), queue);
}
server.getPostOffice()
.getAllBindings()
.filter(b -> b instanceof QueueBinding && ((QueueBinding) b).getQueue().getName().toString().equals(queueName))
.map(b -> (QueueBinding) b)
.forEach(b -> checkQueueForMatch(b.getQueue()));
} }
} }
@ -286,13 +315,13 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
* instance. A subclass can override this method to return a more customized entry type with additional * instance. A subclass can override this method to return a more customized entry type with additional
* state data. * state data.
* *
* @param consumer * @param consumerInfo
* The {@link FederationConsumerInternal} instance that will be housed in this entry. * The consumer information that defines characteristics of the federation queue consumer
* *
* @return a new {@link FederationQueueEntry} that holds the given federation consumer. * @return a new {@link FederationQueueEntry} that holds the given queue name.
*/ */
protected FederationQueueEntry createServerConsumerEntry(FederationConsumerInternal consumer) { protected FederationQueueEntry createConsumerEntry(FederationConsumerInfo consumerInfo) {
return new FederationQueueEntry(consumer); return new FederationQueueEntry(consumerInfo);
} }
/** /**

View File

@ -60,6 +60,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -3281,6 +3282,196 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
} }
} }
@Test(timeout = 20000)
public void testFederationAddressDemandTrackedWhenRemoteRejectsInitialAttempts() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind();
peer.expectFlow().withLinkCredit(10);
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Connect test started, peer listening on: {}", remoteURI);
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
receiveFromAddress.setName("address-policy");
receiveFromAddress.addToIncludes("test");
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName("sample-federation");
element.addLocalAddressPolicy(receiveFromAddress);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final Topic topic = session.createTopic("test");
connection.start();
// First consumer we reject the federation attempt
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.respondInKind()
.withNullSource();
peer.expectFlow().withLinkCredit(1000);
peer.remoteDetach().withErrorCondition("amqp:not-found", "the requested queue was not found").queue().afterDelay(10);
peer.expectDetach();
final MessageConsumer consumer1 = session.createConsumer(topic);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Second consumer we reject the federation attempt
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.respondInKind()
.withNullSource();
peer.expectFlow().withLinkCredit(1000);
peer.remoteDetach().withErrorCondition("amqp:not-found", "the requested queue was not found").queue().afterDelay(10);
peer.expectDetach();
final MessageConsumer consumer2 = session.createConsumer(topic);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Third consumer we accept the federation attempt
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.respondInKind();
peer.expectFlow().withLinkCredit(1000);
final MessageConsumer consumer3 = session.createConsumer(topic);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Demand should remain
consumer3.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Demand should remain
consumer2.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
// Demand should be gone now
consumer1.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
}
@Test(timeout = 20000)
public void testFederationAddressDemandTrackedWhenPluginBlocksInitialAttempts() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind();
peer.expectFlow().withLinkCredit(10);
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Connect test started, peer listening on: {}", remoteURI);
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
receiveFromAddress.setName("address-policy");
receiveFromAddress.addToIncludes("test");
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName("sample-federation");
element.addLocalAddressPolicy(receiveFromAddress);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
final AtomicInteger blockUntilZero = new AtomicInteger(2);
final AMQPTestFederationBrokerPlugin federationPlugin = new AMQPTestFederationBrokerPlugin();
federationPlugin.shouldCreateConsumerForDivert = (d, q) -> true;
federationPlugin.shouldCreateConsumerForQueue = (q) -> true;
federationPlugin.shouldCreateConsumerForAddress = (a) -> {
return blockUntilZero.getAndDecrement() == 0;
};
server.getConfiguration().addAMQPConnection(amqpConnection);
server.registerBrokerPlugin(federationPlugin);
server.start();
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final Topic topic = session.createTopic("test");
connection.start();
final MessageConsumer consumer1 = session.createConsumer(topic);
final MessageConsumer consumer2 = session.createConsumer(topic);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Third consumer we expect the plugin to allow the federation attempt
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.respondInKind();
peer.expectFlow().withLinkCredit(1000);
final MessageConsumer consumer3 = session.createConsumer(topic);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Demand should remain
consumer3.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Demand should remain
consumer2.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
// Demand should be gone now
consumer1.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
}
private static void sendAddressAddedEvent(ProtonTestPeer peer, String address, int handle, int deliveryId) { private static void sendAddressAddedEvent(ProtonTestPeer peer, String address, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>(); final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address); eventMap.put(REQUESTED_ADDRESS_NAME, address);

View File

@ -59,6 +59,13 @@ import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
@ -71,6 +78,7 @@ import javax.jms.Message;
import javax.jms.Queue; import javax.jms.Queue;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -80,10 +88,15 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin;
import org.apache.activemq.artemis.protocol.amqp.federation.Federation;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy; import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
@ -3249,6 +3262,200 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
} }
} }
@Test(timeout = 20000)
public void testFederationQueueDemandTrackedWhenRemoteRejectsInitialAttempts() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind();
peer.expectFlow().withLinkCredit(10);
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Connect test started, peer listening on: {}", remoteURI);
final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement();
receiveFromQueue.setName("queue-policy");
receiveFromQueue.addToIncludes("test", "test");
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName("sample-federation");
element.addLocalQueuePolicy(receiveFromQueue);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
.setAddress("test")
.setAutoCreated(false));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue("test");
connection.start();
// First consumer we reject the federation attempt
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
.respondInKind()
.withNullSource();
peer.expectFlow().withLinkCredit(1000);
peer.remoteDetach().withErrorCondition("amqp:not-found", "the requested queue was not found").queue().afterDelay(10);
peer.expectDetach();
final MessageConsumer consumer1 = session.createConsumer(queue);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Second consumer we reject the federation attempt
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
.respondInKind()
.withNullSource();
peer.expectFlow().withLinkCredit(1000);
peer.remoteDetach().withErrorCondition("amqp:not-found", "the requested queue was not found").queue().afterDelay(10);
peer.expectDetach();
final MessageConsumer consumer2 = session.createConsumer(queue);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Third consumer we accept the federation attempt
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
.respond()
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
final MessageConsumer consumer3 = session.createConsumer(queue);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Demand should remain
consumer3.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Demand should remain
consumer2.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
// Demand should be gone now
consumer1.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
}
@Test(timeout = 20000)
public void testFederationQueueDemandTrackedWhenPluginBlocksInitialAttempts() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind();
peer.expectFlow().withLinkCredit(10);
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Connect test started, peer listening on: {}", remoteURI);
final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement();
receiveFromQueue.setName("queue-policy");
receiveFromQueue.addToIncludes("test", "test");
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName("sample-federation");
element.addLocalQueuePolicy(receiveFromQueue);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
final AtomicInteger blockUntilZero = new AtomicInteger(2);
final AMQPTestFederationBrokerPlugin federationPlugin = new AMQPTestFederationBrokerPlugin();
federationPlugin.shouldCreateConsumerForDivert = (d, q) -> true;
federationPlugin.shouldCreateConsumerForQueue = (q) -> {
return blockUntilZero.getAndDecrement() == 0;
};
server.getConfiguration().addAMQPConnection(amqpConnection);
server.registerBrokerPlugin(federationPlugin);
server.start();
server.createQueue(new QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
.setAddress("test")
.setAutoCreated(false));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue("test");
connection.start();
final MessageConsumer consumer1 = session.createConsumer(queue);
final MessageConsumer consumer2 = session.createConsumer(queue);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Third consumer we expect the plugin to allow the federation attempt
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
.respondInKind();
peer.expectFlow().withLinkCredit(1000);
final MessageConsumer consumer3 = session.createConsumer(queue);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Demand should remain
consumer3.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Demand should remain
consumer2.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
// Demand should be gone now
consumer1.close();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
}
private static void sendQueueAddedEvent(ProtonTestPeer peer, String address, String queue, int handle, int deliveryId) { private static void sendQueueAddedEvent(ProtonTestPeer peer, String address, String queue, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>(); final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address); eventMap.put(REQUESTED_ADDRESS_NAME, address);
@ -3442,4 +3649,82 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
.withOfferedCapability(FEDERATION_EVENT_LINK.toString()); .withOfferedCapability(FEDERATION_EVENT_LINK.toString());
} }
} }
private class AMQPTestFederationBrokerPlugin implements ActiveMQServerAMQPFederationPlugin {
public final AtomicBoolean started = new AtomicBoolean();
public final AtomicBoolean stopped = new AtomicBoolean();
public final AtomicReference<FederationConsumerInfo> beforeCreateConsumerCapture = new AtomicReference<>();
public final AtomicReference<FederationConsumer> afterCreateConsumerCapture = new AtomicReference<>();
public final AtomicReference<FederationConsumer> beforeCloseConsumerCapture = new AtomicReference<>();
public final AtomicReference<FederationConsumer> afterCloseConsumerCapture = new AtomicReference<>();
public Consumer<FederationConsumerInfo> beforeCreateConsumer = (c) -> beforeCreateConsumerCapture.set(c);;
public Consumer<FederationConsumer> afterCreateConsumer = (c) -> afterCreateConsumerCapture.set(c);
public Consumer<FederationConsumer> beforeCloseConsumer = (c) -> beforeCloseConsumerCapture.set(c);
public Consumer<FederationConsumer> afterCloseConsumer = (c) -> afterCloseConsumerCapture.set(c);
public BiConsumer<FederationConsumer, org.apache.activemq.artemis.api.core.Message> beforeMessageHandled = (c, m) -> { };
public BiConsumer<FederationConsumer, org.apache.activemq.artemis.api.core.Message> afterMessageHandled = (c, m) -> { };
public Function<AddressInfo, Boolean> shouldCreateConsumerForAddress = (a) -> true;
public Function<org.apache.activemq.artemis.core.server.Queue, Boolean> shouldCreateConsumerForQueue = (q) -> true;
public BiFunction<Divert, org.apache.activemq.artemis.core.server.Queue, Boolean> shouldCreateConsumerForDivert = (d, q) -> true;
@Override
public void federationStarted(final Federation federation) throws ActiveMQException {
started.set(true);
}
@Override
public void federationStopped(final Federation federation) throws ActiveMQException {
stopped.set(true);
}
@Override
public void beforeCreateFederationConsumer(final FederationConsumerInfo consumerInfo) throws ActiveMQException {
beforeCreateConsumer.accept(consumerInfo);
}
@Override
public void afterCreateFederationConsumer(final FederationConsumer consumer) throws ActiveMQException {
afterCreateConsumer.accept(consumer);
}
@Override
public void beforeCloseFederationConsumer(final FederationConsumer consumer) throws ActiveMQException {
beforeCloseConsumer.accept(consumer);
}
@Override
public void afterCloseFederationConsumer(final FederationConsumer consumer) throws ActiveMQException {
afterCloseConsumer.accept(consumer);
}
@Override
public void beforeFederationConsumerMessageHandled(final FederationConsumer consumer, org.apache.activemq.artemis.api.core.Message message) throws ActiveMQException {
beforeMessageHandled.accept(consumer, message);
}
@Override
public void afterFederationConsumerMessageHandled(final FederationConsumer consumer, org.apache.activemq.artemis.api.core.Message message) throws ActiveMQException {
afterMessageHandled.accept(consumer, message);
}
@Override
public boolean shouldCreateFederationConsumerForAddress(final AddressInfo address) throws ActiveMQException {
return shouldCreateConsumerForAddress.apply(address);
}
@Override
public boolean shouldCreateFederationConsumerForQueue(final org.apache.activemq.artemis.core.server.Queue queue) throws ActiveMQException {
return shouldCreateConsumerForQueue.apply(queue);
}
@Override
public boolean shouldCreateFederationConsumerForDivert(Divert divert, org.apache.activemq.artemis.core.server.Queue queue) throws ActiveMQException {
return shouldCreateConsumerForDivert.apply(divert, queue);
}
}
} }