From 659b17c3a933a7d80f9148836d7b9bbd4275932f Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 26 Apr 2024 11:00:26 -0400 Subject: [PATCH] ARTEMIS-4745 Allow configuration of the pull consumer batch size Allow for configuration of the batch size granted to the remote when an AMQP federation queue receiver is pulling messages only when there is local capacity to handle them. Some code housekeeping is done here to make adding future properties a bit simpler and require fewer changes. --- .../connect/federation/AMQPFederation.java | 35 +-------- .../AMQPFederationAddressConsumer.java | 4 +- .../AMQPFederationAddressPolicyManager.java | 2 +- .../AMQPFederationConfiguration.java | 21 +++++ .../federation/AMQPFederationConstants.java | 8 ++ .../AMQPFederationConsumerConfiguration.java | 30 ++++++-- .../AMQPFederationQueueConsumer.java | 10 +-- .../federation/AMQPFederationSource.java | 57 +------------- .../federation/AMQPFederationTarget.java | 34 +-------- .../connect/AMQPFederationConnectTest.java | 4 + .../AMQPFederationQueuePolicyTest.java | 76 ++++++++++++++++--- 11 files changed, 136 insertions(+), 145 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java index d2a8cf3542..1f3c818155 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java @@ -36,7 +36,6 @@ import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationQ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,39 +131,9 @@ public abstract class AMQPFederation implements FederationInternal { public abstract AMQPSessionContext getSessionContext(); /** - * @return the timeout before signaling an error when creating remote link (0 mean disable). + * @return the federation configuration that is in effect. */ - public abstract int getLinkAttachTimeout(); - - /** - * @return the configured {@link Receiver} link credit batch size. - */ - public abstract int getReceiverCredits(); - - /** - * @return the configured {@link Receiver} link credit low value. - */ - public abstract int getReceiverCreditsLow(); - - /** - * @return the size in bytes before a message is considered large. - */ - public abstract int getLargeMessageThreshold(); - - /** - * @return the true if the federation should ignore filters on queue consumers. - */ - public abstract boolean isIgnoreQueueConsumerFilters(); - - /** - * @return the true if the federation should ignore priorities on queue consumers. - */ - public abstract boolean isIgnoreQueueConsumerPriorities(); - - /** - * @return the true if the federation should support core message tunneling. - */ - public abstract boolean isCoreMessageTunnelingEnabled(); + public abstract AMQPFederationConfiguration getConfiguration(); @Override public final synchronized void start() throws ActiveMQException { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java index 777452c708..2e8346a63f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java @@ -321,11 +321,11 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal final ScheduledFuture openTimeoutTask; final AtomicBoolean openTimedOut = new AtomicBoolean(false); - if (federation.getLinkAttachTimeout() > 0) { + if (configuration.getLinkAttachTimeout() > 0) { openTimeoutTask = federation.getServer().getScheduledPool().schedule(() -> { openTimedOut.set(true); federation.signalResourceCreateError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout()); - }, federation.getLinkAttachTimeout(), TimeUnit.SECONDS); + }, configuration.getLinkAttachTimeout(), TimeUnit.SECONDS); } else { openTimeoutTask = null; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java index f6bc4f9e2d..ca5ce721a9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java @@ -99,7 +99,7 @@ public class AMQPFederationAddressPolicyManager extends FederationAddressPolicyM // Address consumers can't pull as we have no real metric to indicate when / how much // we should pull so instead we refuse to match if credit set to zero. - if (federation.getReceiverCredits() <= 0) { + if (federation.getConfiguration().getReceiverCredits() <= 0) { logger.debug("Federation address policy rejecting match on {} because credit is set to zero:", addressInfo.getName()); return false; } else { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java index f549878323..a27443d690 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect.federation; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS_LOW; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_FILTERS; @@ -47,6 +48,11 @@ public final class AMQPFederationConfiguration { */ public static final int DEFAULT_LINK_ATTACH_TIMEOUT = 30; + /** + * Default credits granted to a receiver that is in pull mode. + */ + public static final int DEFAULT_PULL_CREDIT_BATCH_SIZE = 100; + /** * Default value for the core message tunneling feature that indicates if core protocol messages * should be streamed as binary blobs as the payload of an custom AMQP message which avoids any @@ -112,6 +118,20 @@ public final class AMQPFederationConfiguration { } } + /** + * @return the credit batch size offered to a {@link Receiver} link that is in pull mode. + */ + public int getPullReceiverBatchSize() { + final Object property = properties.get(PULL_RECEIVER_BATCH_SIZE); + if (property instanceof Number) { + return ((Number) property).intValue(); + } else if (property instanceof String) { + return Integer.parseInt((String) property); + } else { + return DEFAULT_PULL_CREDIT_BATCH_SIZE; + } + } + /** * @return the size in bytes of an incoming message after which the {@link Receiver} treats it as large. */ @@ -193,6 +213,7 @@ public final class AMQPFederationConfiguration { configMap.put(RECEIVER_CREDITS, getReceiverCredits()); configMap.put(RECEIVER_CREDITS_LOW, getReceiverCreditsLow()); + configMap.put(PULL_RECEIVER_BATCH_SIZE, getPullReceiverBatchSize()); configMap.put(LARGE_MESSAGE_THRESHOLD, getLargeMessageThreshold()); configMap.put(LINK_ATTACH_TIMEOUT, getLinkAttachTimeout()); configMap.put(IGNORE_QUEUE_CONSUMER_FILTERS, isIgnoreSubscriptionFilters()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java index 30e94a2be9..85183c121d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java @@ -77,6 +77,14 @@ public final class AMQPFederationConstants { */ public static final String RECEIVER_CREDITS_LOW = "amqpLowCredits"; + /** + * Configuration property that defines the amount of credits to batch to an AMQP receiver link + * and the top up value when sending more credit once the broker has capacity available for + * them. this can be sent to the peer so that dual federation configurations share the same + * configuration on both sides of the connection. + */ + public static final String PULL_RECEIVER_BATCH_SIZE = "amqpPullConsumerCredits"; + /** * Configuration property used to convey the local side value to use when considering if a message * is a large message, this can be sent to the peer so that dual federation configurations share diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java index 0f5dfd0c1c..cced975129 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java @@ -21,6 +21,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS_LOW; @@ -29,6 +30,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; +import org.apache.qpid.proton.engine.Receiver; /** * Configuration options applied to a consumer created from federation policies @@ -59,7 +61,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Integer.parseInt((String) property); } else { - return federation.getReceiverCredits(); + return federation.getConfiguration().getReceiverCredits(); } } @@ -70,7 +72,21 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Integer.parseInt((String) property); } else { - return federation.getReceiverCreditsLow(); + return federation.getConfiguration().getReceiverCreditsLow(); + } + } + + /** + * @return the credit batch size offered to a {@link Receiver} link that is in pull mode. + */ + public int getPullReceiverBatchSize() { + final Object property = properties.get(PULL_RECEIVER_BATCH_SIZE); + if (property instanceof Number) { + return ((Number) property).intValue(); + } else if (property instanceof String) { + return Integer.parseInt((String) property); + } else { + return federation.getConfiguration().getPullReceiverBatchSize(); } } @@ -81,7 +97,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Integer.parseInt((String) property); } else { - return federation.getLargeMessageThreshold(); + return federation.getConfiguration().getLargeMessageThreshold(); } } @@ -92,7 +108,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Integer.parseInt((String) property); } else { - return federation.getLinkAttachTimeout(); + return federation.getConfiguration().getLinkAttachTimeout(); } } @@ -103,7 +119,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Boolean.parseBoolean((String) property); } else { - return federation.isCoreMessageTunnelingEnabled(); + return federation.getConfiguration().isCoreMessageTunnelingEnabled(); } } @@ -114,7 +130,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Boolean.parseBoolean((String) property); } else { - return federation.isIgnoreQueueConsumerFilters(); + return federation.getConfiguration().isIgnoreSubscriptionFilters(); } } @@ -125,7 +141,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Boolean.parseBoolean((String) property); } else { - return federation.isIgnoreQueueConsumerPriorities(); + return federation.getConfiguration().isIgnoreSubscriptionPriorities(); } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java index 086ce23c01..03fec69e54 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java @@ -91,8 +91,6 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static final int DEFAULT_PULL_CREDIT_BATCH_SIZE = 100; - public static final int DEFAULT_PENDING_MSG_CHECK_BACKOFF_MULTIPLIER = 2; public static final int DEFAULT_PENDING_MSG_CHECK_MAX_DELAY = 30; @@ -315,11 +313,11 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal { final ScheduledFuture openTimeoutTask; final AtomicBoolean openTimedOut = new AtomicBoolean(false); - if (federation.getLinkAttachTimeout() > 0) { + if (configuration.getLinkAttachTimeout() > 0) { openTimeoutTask = federation.getServer().getScheduledPool().schedule(() -> { openTimedOut.set(true); federation.signalResourceCreateError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout()); - }, federation.getLinkAttachTimeout(), TimeUnit.SECONDS); + }, configuration.getLinkAttachTimeout(), TimeUnit.SECONDS); } else { openTimeoutTask = null; } @@ -516,7 +514,7 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal { // credit. This also allows consumers created on the remote side of a federation connection // to read from properties sent from the federation source that indicate the values that are // configured on the local side. - if (federation.getReceiverCredits() > 0) { + if (configuration.getReceiverCredits() > 0) { return createCreditRunnable(configuration.getReceiverCredits(), configuration.getReceiverCreditsLow(), receiver, connection, this); } else { return this::checkIfCreditTopUpNeeded; @@ -579,7 +577,7 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal { return; // Closed before this was triggered. } - receiver.flow(DEFAULT_PULL_CREDIT_BATCH_SIZE); + receiver.flow(configuration.getPullReceiverBatchSize()); connection.instantFlush(); lastBacklogCheckDelay = 0; creditTopUpInProgress.set(false); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java index d68fc6aa8c..96fbdbe9a3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java @@ -124,11 +124,6 @@ public class AMQPFederationSource extends AMQPFederation { return brokerConnection; } - @Override - public int getLinkAttachTimeout() { - return configuration.getLinkAttachTimeout(); - } - @Override public synchronized AMQPSessionContext getSessionContext() { if (!connected) { @@ -148,58 +143,12 @@ public class AMQPFederationSource extends AMQPFederation { } @Override - public synchronized int getReceiverCredits() { + public synchronized AMQPFederationConfiguration getConfiguration() { if (!connected) { - throw new IllegalStateException("Cannot access connection configuration, federation is not connected"); + throw new IllegalStateException("Cannot access connection while federation is not connected"); } - return configuration.getReceiverCredits(); - } - - @Override - public synchronized int getReceiverCreditsLow() { - if (!connected) { - throw new IllegalStateException("Cannot access connection configuration, federation is not connected"); - } - - return configuration.getReceiverCreditsLow(); - } - - @Override - public synchronized int getLargeMessageThreshold() { - if (!connected) { - throw new IllegalStateException("Cannot access connection configuration, federation is not connected"); - } - - return configuration.getLargeMessageThreshold(); - } - - @Override - public boolean isCoreMessageTunnelingEnabled() { - if (!connected) { - throw new IllegalStateException("Cannot access connection configuration, federation is not connected"); - } - - return configuration.isCoreMessageTunnelingEnabled(); - } - - - @Override - public boolean isIgnoreQueueConsumerFilters() { - if (!connected) { - throw new IllegalStateException("Cannot access connection configuration, federation is not connected"); - } - - return configuration.isIgnoreSubscriptionFilters(); - } - - @Override - public boolean isIgnoreQueueConsumerPriorities() { - if (!connected) { - throw new IllegalStateException("Cannot access connection configuration, federation is not connected"); - } - - return configuration.isIgnoreSubscriptionPriorities(); + return configuration; } /** diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java index 581dd47745..e13527eb9b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java @@ -67,38 +67,8 @@ public class AMQPFederationTarget extends AMQPFederation { } @Override - public int getReceiverCredits() { - return configuration.getReceiverCredits(); - } - - @Override - public int getReceiverCreditsLow() { - return configuration.getReceiverCreditsLow(); - } - - @Override - public int getLargeMessageThreshold() { - return configuration.getLargeMessageThreshold(); - } - - @Override - public int getLinkAttachTimeout() { - return configuration.getLinkAttachTimeout(); - } - - @Override - public boolean isCoreMessageTunnelingEnabled() { - return configuration.isCoreMessageTunnelingEnabled(); - } - - @Override - public boolean isIgnoreQueueConsumerFilters() { - return configuration.isIgnoreSubscriptionFilters(); - } - - @Override - public boolean isIgnoreQueueConsumerPriorities() { - return configuration.isIgnoreSubscriptionPriorities(); + public synchronized AMQPFederationConfiguration getConfiguration() { + return configuration; } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java index 11bcf8c642..13b2619ed5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java @@ -34,6 +34,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_NAME; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_EXCLUDES; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDES; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDE_FEDERATED; @@ -141,6 +142,7 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport { final int AMQP_MIN_LARGE_MESSAGE_SIZE = 10_000; final int AMQP_CREDITS = 100; final int AMQP_CREDITS_LOW = 50; + final int AMQP_PULL_CREDITS_BATCH = 50; final int AMQP_LINK_ATTACH_TIMEOUT = 60; final boolean AMQP_TUNNEL_CORE_MESSAGES = false; final boolean AMQP_INGNORE_CONSUMER_FILTERS = false; @@ -149,6 +151,7 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport { final Map federationConfiguration = new HashMap<>(); federationConfiguration.put(RECEIVER_CREDITS, AMQP_CREDITS); federationConfiguration.put(RECEIVER_CREDITS_LOW, AMQP_CREDITS_LOW); + federationConfiguration.put(PULL_RECEIVER_BATCH_SIZE, AMQP_PULL_CREDITS_BATCH); federationConfiguration.put(LARGE_MESSAGE_THRESHOLD, AMQP_MIN_LARGE_MESSAGE_SIZE); federationConfiguration.put(LINK_ATTACH_TIMEOUT, AMQP_LINK_ATTACH_TIMEOUT); federationConfiguration.put(IGNORE_QUEUE_CONSUMER_FILTERS, AMQP_INGNORE_CONSUMER_FILTERS); @@ -183,6 +186,7 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport { final AMQPFederatedBrokerConnectionElement federation = new AMQPFederatedBrokerConnectionElement("myFederation"); federation.addProperty(LINK_ATTACH_TIMEOUT, AMQP_LINK_ATTACH_TIMEOUT); federation.addProperty(AmqpSupport.TUNNEL_CORE_MESSAGES, Boolean.toString(AMQP_TUNNEL_CORE_MESSAGES)); + federation.addProperty(PULL_RECEIVER_BATCH_SIZE, AMQP_PULL_CREDITS_BATCH); amqpConnection.addElement(federation); server.getConfiguration().addAMQPConnection(amqpConnection); server.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java index 3a987e8b5e..4cb32f385d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java @@ -41,8 +41,9 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_CLASS_NAME; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_PROPERTIES_MAP; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_PROPERTIES_MAP; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport.DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT; -import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationQueueConsumer.DEFAULT_PULL_CREDIT_BATCH_SIZE; +import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConfiguration.DEFAULT_PULL_CREDIT_BATCH_SIZE; import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.allOf; @@ -2566,7 +2567,29 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { } @Test(timeout = 20000) - public void testPullQueueConsumerGrantsCreditOnEmptyQueue() throws Exception { + public void testPullQueueConsumerGrantsDefaultCreditOnEmptyQueue() throws Exception { + doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(0, false, 0, false, DEFAULT_PULL_CREDIT_BATCH_SIZE); + } + + @Test(timeout = 20000) + public void testPullQueueConsumerGrantsReceiverConfiguredCreditOnEmptyQueue() throws Exception { + doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(0, false, 10, true, 10); + } + + @Test(timeout = 20000) + public void testPullQueueConsumerGrantsFederationConfiguredCreditOnEmptyQueue() throws Exception { + doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(20, true, 0, false, 20); + } + + @Test(timeout = 20000) + public void testPullQueueConsumerGrantsReceiverConfiguredCreditOverFederationConfiguredOnEmptyQueue() throws Exception { + doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(20, true, 10, true, 10); + } + + private void doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(int globalBatch, boolean setGlobal, + int receiverBatch, boolean setReceiver, + int expected) throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { peer.expectSASLAnonymousConnect(); peer.expectOpen().respond(); @@ -2586,14 +2609,20 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement(); receiveFromQueue.setName("queue-policy"); receiveFromQueue.addToIncludes("test", "test"); + receiveFromQueue.addProperty(RECEIVER_CREDITS, 0); + if (setReceiver) { + receiveFromQueue.addProperty(PULL_RECEIVER_BATCH_SIZE, receiverBatch); + } final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); element.setName(getTestName()); element.addLocalQueuePolicy(receiveFromQueue); + if (setGlobal) { + element.addProperty(PULL_RECEIVER_BATCH_SIZE, globalBatch); + } final AMQPBrokerConnectConfiguration amqpConnection = - new AMQPBrokerConnectConfiguration( - getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort() + "?amqpCredits=0"); + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); amqpConnection.setReconnectAttempts(0);// No reconnects amqpConnection.addElement(element); @@ -2611,7 +2640,7 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { containsString("queue-receiver"), containsString(server.getNodeID().toString()))) .respondInKind(); - peer.expectFlow().withLinkCredit(DEFAULT_PULL_CREDIT_BATCH_SIZE); + peer.expectFlow().withLinkCredit(expected); final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); @@ -2706,6 +2735,27 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { @Test(timeout = 30000) public void testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrain() throws Exception { + doTestPullConsumerCreditTopUpAfterEachBacklogDrain(0, false, 0, false, DEFAULT_PULL_CREDIT_BATCH_SIZE); + } + + @Test(timeout = 30000) + public void testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrainFederationConfigured() throws Exception { + doTestPullConsumerCreditTopUpAfterEachBacklogDrain(10, true, 0, false, 10); + } + + @Test(timeout = 30000) + public void testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrainPolicyConfigured() throws Exception { + doTestPullConsumerCreditTopUpAfterEachBacklogDrain(0, false, 20, true, 20); + } + + @Test(timeout = 30000) + public void testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrainBothConfigured() throws Exception { + doTestPullConsumerCreditTopUpAfterEachBacklogDrain(100, true, 20, true, 20); + } + + private void doTestPullConsumerCreditTopUpAfterEachBacklogDrain(int globalBatch, boolean setGlobal, + int receiverBatch, boolean setReceiver, + int expected) throws Exception { try (ProtonTestServer peer = new ProtonTestServer()) { peer.expectSASLAnonymousConnect(); peer.expectOpen().respond(); @@ -2727,10 +2777,16 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement(); receiveFromQueue.setName("queue-policy"); receiveFromQueue.addToIncludes("test", "test"); + if (setReceiver) { + receiveFromQueue.addProperty(PULL_RECEIVER_BATCH_SIZE, receiverBatch); + } final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); element.setName(getTestName()); element.addLocalQueuePolicy(receiveFromQueue); + if (setGlobal) { + element.addProperty(PULL_RECEIVER_BATCH_SIZE, globalBatch); + } final AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration( @@ -2769,7 +2825,7 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { connection.start(); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); - peer.expectFlow().withLinkCredit(DEFAULT_PULL_CREDIT_BATCH_SIZE); + peer.expectFlow().withLinkCredit(expected); // Remove the backlog and credit should be offered to the remote assertNotNull(consumer.receiveNoWait()); @@ -2777,7 +2833,7 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { peer.waitForScriptToComplete(20, TimeUnit.SECONDS); // Consume all the credit that was presented in the batch - for (int i = 0; i < DEFAULT_PULL_CREDIT_BATCH_SIZE; ++i) { + for (int i = 0; i < expected; ++i) { peer.expectDisposition().withState().accepted(); peer.remoteTransfer().withBody().withString("test-message") .also() @@ -2785,19 +2841,19 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { .now(); } - Wait.assertTrue(() -> server.queueQuery(queueName).getMessageCount() == DEFAULT_PULL_CREDIT_BATCH_SIZE, 10_000); + Wait.assertTrue(() -> server.queueQuery(queueName).getMessageCount() == expected, 10_000); // Consume all the newly received message from the remote except one // which should leave the queue with a pending message so no credit // should be offered. - for (int i = 0; i < DEFAULT_PULL_CREDIT_BATCH_SIZE - 1; ++i) { + for (int i = 0; i < expected - 1; ++i) { assertNotNull(consumer.receiveNoWait()); } // We should not get a new batch yet as there is still one pending // message on the local queue we have not consumed. peer.waitForScriptToComplete(5, TimeUnit.SECONDS); - peer.expectFlow().withLinkCredit(DEFAULT_PULL_CREDIT_BATCH_SIZE); + peer.expectFlow().withLinkCredit(expected); // Remove the backlog and credit should be offered to the remote again assertNotNull(consumer.receiveNoWait());