ARTEMIS-4745 Allow configuration of the pull consumer batch size

Allow for configuration of the batch size granted to the remote when an
AMQP federation queue receiver is pulling messages only when there is
local capacity to handle them. Some code housekeeping is done here to
make adding future properties a bit simpler and require fewer changes.
This commit is contained in:
Timothy Bish 2024-04-26 11:00:26 -04:00 committed by Robbie Gemmell
parent ee7a2c0944
commit 659b17c3a9
11 changed files with 136 additions and 145 deletions

View File

@ -36,7 +36,6 @@ import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationQ
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -132,39 +131,9 @@ public abstract class AMQPFederation implements FederationInternal {
public abstract AMQPSessionContext getSessionContext();
/**
* @return the timeout before signaling an error when creating remote link (0 mean disable).
* @return the federation configuration that is in effect.
*/
public abstract int getLinkAttachTimeout();
/**
* @return the configured {@link Receiver} link credit batch size.
*/
public abstract int getReceiverCredits();
/**
* @return the configured {@link Receiver} link credit low value.
*/
public abstract int getReceiverCreditsLow();
/**
* @return the size in bytes before a message is considered large.
*/
public abstract int getLargeMessageThreshold();
/**
* @return the true if the federation should ignore filters on queue consumers.
*/
public abstract boolean isIgnoreQueueConsumerFilters();
/**
* @return the true if the federation should ignore priorities on queue consumers.
*/
public abstract boolean isIgnoreQueueConsumerPriorities();
/**
* @return the true if the federation should support core message tunneling.
*/
public abstract boolean isCoreMessageTunnelingEnabled();
public abstract AMQPFederationConfiguration getConfiguration();
@Override
public final synchronized void start() throws ActiveMQException {

View File

@ -321,11 +321,11 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
final ScheduledFuture<?> openTimeoutTask;
final AtomicBoolean openTimedOut = new AtomicBoolean(false);
if (federation.getLinkAttachTimeout() > 0) {
if (configuration.getLinkAttachTimeout() > 0) {
openTimeoutTask = federation.getServer().getScheduledPool().schedule(() -> {
openTimedOut.set(true);
federation.signalResourceCreateError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout());
}, federation.getLinkAttachTimeout(), TimeUnit.SECONDS);
}, configuration.getLinkAttachTimeout(), TimeUnit.SECONDS);
} else {
openTimeoutTask = null;
}

View File

@ -99,7 +99,7 @@ public class AMQPFederationAddressPolicyManager extends FederationAddressPolicyM
// Address consumers can't pull as we have no real metric to indicate when / how much
// we should pull so instead we refuse to match if credit set to zero.
if (federation.getReceiverCredits() <= 0) {
if (federation.getConfiguration().getReceiverCredits() <= 0) {
logger.debug("Federation address policy rejecting match on {} because credit is set to zero:", addressInfo.getName());
return false;
} else {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS_LOW;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_FILTERS;
@ -47,6 +48,11 @@ public final class AMQPFederationConfiguration {
*/
public static final int DEFAULT_LINK_ATTACH_TIMEOUT = 30;
/**
* Default credits granted to a receiver that is in pull mode.
*/
public static final int DEFAULT_PULL_CREDIT_BATCH_SIZE = 100;
/**
* Default value for the core message tunneling feature that indicates if core protocol messages
* should be streamed as binary blobs as the payload of an custom AMQP message which avoids any
@ -112,6 +118,20 @@ public final class AMQPFederationConfiguration {
}
}
/**
* @return the credit batch size offered to a {@link Receiver} link that is in pull mode.
*/
public int getPullReceiverBatchSize() {
final Object property = properties.get(PULL_RECEIVER_BATCH_SIZE);
if (property instanceof Number) {
return ((Number) property).intValue();
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return DEFAULT_PULL_CREDIT_BATCH_SIZE;
}
}
/**
* @return the size in bytes of an incoming message after which the {@link Receiver} treats it as large.
*/
@ -193,6 +213,7 @@ public final class AMQPFederationConfiguration {
configMap.put(RECEIVER_CREDITS, getReceiverCredits());
configMap.put(RECEIVER_CREDITS_LOW, getReceiverCreditsLow());
configMap.put(PULL_RECEIVER_BATCH_SIZE, getPullReceiverBatchSize());
configMap.put(LARGE_MESSAGE_THRESHOLD, getLargeMessageThreshold());
configMap.put(LINK_ATTACH_TIMEOUT, getLinkAttachTimeout());
configMap.put(IGNORE_QUEUE_CONSUMER_FILTERS, isIgnoreSubscriptionFilters());

View File

@ -77,6 +77,14 @@ public final class AMQPFederationConstants {
*/
public static final String RECEIVER_CREDITS_LOW = "amqpLowCredits";
/**
* Configuration property that defines the amount of credits to batch to an AMQP receiver link
* and the top up value when sending more credit once the broker has capacity available for
* them. this can be sent to the peer so that dual federation configurations share the same
* configuration on both sides of the connection.
*/
public static final String PULL_RECEIVER_BATCH_SIZE = "amqpPullConsumerCredits";
/**
* Configuration property used to convey the local side value to use when considering if a message
* is a large message, this can be sent to the peer so that dual federation configurations share

View File

@ -21,6 +21,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS_LOW;
@ -29,6 +30,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.qpid.proton.engine.Receiver;
/**
* Configuration options applied to a consumer created from federation policies
@ -59,7 +61,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return federation.getReceiverCredits();
return federation.getConfiguration().getReceiverCredits();
}
}
@ -70,7 +72,21 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return federation.getReceiverCreditsLow();
return federation.getConfiguration().getReceiverCreditsLow();
}
}
/**
* @return the credit batch size offered to a {@link Receiver} link that is in pull mode.
*/
public int getPullReceiverBatchSize() {
final Object property = properties.get(PULL_RECEIVER_BATCH_SIZE);
if (property instanceof Number) {
return ((Number) property).intValue();
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return federation.getConfiguration().getPullReceiverBatchSize();
}
}
@ -81,7 +97,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return federation.getLargeMessageThreshold();
return federation.getConfiguration().getLargeMessageThreshold();
}
}
@ -92,7 +108,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) {
return Integer.parseInt((String) property);
} else {
return federation.getLinkAttachTimeout();
return federation.getConfiguration().getLinkAttachTimeout();
}
}
@ -103,7 +119,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) {
return Boolean.parseBoolean((String) property);
} else {
return federation.isCoreMessageTunnelingEnabled();
return federation.getConfiguration().isCoreMessageTunnelingEnabled();
}
}
@ -114,7 +130,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) {
return Boolean.parseBoolean((String) property);
} else {
return federation.isIgnoreQueueConsumerFilters();
return federation.getConfiguration().isIgnoreSubscriptionFilters();
}
}
@ -125,7 +141,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) {
return Boolean.parseBoolean((String) property);
} else {
return federation.isIgnoreQueueConsumerPriorities();
return federation.getConfiguration().isIgnoreSubscriptionPriorities();
}
}
}

View File

@ -91,8 +91,6 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final int DEFAULT_PULL_CREDIT_BATCH_SIZE = 100;
public static final int DEFAULT_PENDING_MSG_CHECK_BACKOFF_MULTIPLIER = 2;
public static final int DEFAULT_PENDING_MSG_CHECK_MAX_DELAY = 30;
@ -315,11 +313,11 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
final ScheduledFuture<?> openTimeoutTask;
final AtomicBoolean openTimedOut = new AtomicBoolean(false);
if (federation.getLinkAttachTimeout() > 0) {
if (configuration.getLinkAttachTimeout() > 0) {
openTimeoutTask = federation.getServer().getScheduledPool().schedule(() -> {
openTimedOut.set(true);
federation.signalResourceCreateError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout());
}, federation.getLinkAttachTimeout(), TimeUnit.SECONDS);
}, configuration.getLinkAttachTimeout(), TimeUnit.SECONDS);
} else {
openTimeoutTask = null;
}
@ -516,7 +514,7 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
// credit. This also allows consumers created on the remote side of a federation connection
// to read from properties sent from the federation source that indicate the values that are
// configured on the local side.
if (federation.getReceiverCredits() > 0) {
if (configuration.getReceiverCredits() > 0) {
return createCreditRunnable(configuration.getReceiverCredits(), configuration.getReceiverCreditsLow(), receiver, connection, this);
} else {
return this::checkIfCreditTopUpNeeded;
@ -579,7 +577,7 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
return; // Closed before this was triggered.
}
receiver.flow(DEFAULT_PULL_CREDIT_BATCH_SIZE);
receiver.flow(configuration.getPullReceiverBatchSize());
connection.instantFlush();
lastBacklogCheckDelay = 0;
creditTopUpInProgress.set(false);

View File

@ -124,11 +124,6 @@ public class AMQPFederationSource extends AMQPFederation {
return brokerConnection;
}
@Override
public int getLinkAttachTimeout() {
return configuration.getLinkAttachTimeout();
}
@Override
public synchronized AMQPSessionContext getSessionContext() {
if (!connected) {
@ -148,58 +143,12 @@ public class AMQPFederationSource extends AMQPFederation {
}
@Override
public synchronized int getReceiverCredits() {
public synchronized AMQPFederationConfiguration getConfiguration() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
throw new IllegalStateException("Cannot access connection while federation is not connected");
}
return configuration.getReceiverCredits();
}
@Override
public synchronized int getReceiverCreditsLow() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}
return configuration.getReceiverCreditsLow();
}
@Override
public synchronized int getLargeMessageThreshold() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}
return configuration.getLargeMessageThreshold();
}
@Override
public boolean isCoreMessageTunnelingEnabled() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}
return configuration.isCoreMessageTunnelingEnabled();
}
@Override
public boolean isIgnoreQueueConsumerFilters() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}
return configuration.isIgnoreSubscriptionFilters();
}
@Override
public boolean isIgnoreQueueConsumerPriorities() {
if (!connected) {
throw new IllegalStateException("Cannot access connection configuration, federation is not connected");
}
return configuration.isIgnoreSubscriptionPriorities();
return configuration;
}
/**

View File

@ -67,38 +67,8 @@ public class AMQPFederationTarget extends AMQPFederation {
}
@Override
public int getReceiverCredits() {
return configuration.getReceiverCredits();
}
@Override
public int getReceiverCreditsLow() {
return configuration.getReceiverCreditsLow();
}
@Override
public int getLargeMessageThreshold() {
return configuration.getLargeMessageThreshold();
}
@Override
public int getLinkAttachTimeout() {
return configuration.getLinkAttachTimeout();
}
@Override
public boolean isCoreMessageTunnelingEnabled() {
return configuration.isCoreMessageTunnelingEnabled();
}
@Override
public boolean isIgnoreQueueConsumerFilters() {
return configuration.isIgnoreSubscriptionFilters();
}
@Override
public boolean isIgnoreQueueConsumerPriorities() {
return configuration.isIgnoreSubscriptionPriorities();
public synchronized AMQPFederationConfiguration getConfiguration() {
return configuration;
}
@Override

View File

@ -34,6 +34,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_NAME;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_EXCLUDES;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDES;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDE_FEDERATED;
@ -141,6 +142,7 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
final int AMQP_MIN_LARGE_MESSAGE_SIZE = 10_000;
final int AMQP_CREDITS = 100;
final int AMQP_CREDITS_LOW = 50;
final int AMQP_PULL_CREDITS_BATCH = 50;
final int AMQP_LINK_ATTACH_TIMEOUT = 60;
final boolean AMQP_TUNNEL_CORE_MESSAGES = false;
final boolean AMQP_INGNORE_CONSUMER_FILTERS = false;
@ -149,6 +151,7 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
final Map<String, Object> federationConfiguration = new HashMap<>();
federationConfiguration.put(RECEIVER_CREDITS, AMQP_CREDITS);
federationConfiguration.put(RECEIVER_CREDITS_LOW, AMQP_CREDITS_LOW);
federationConfiguration.put(PULL_RECEIVER_BATCH_SIZE, AMQP_PULL_CREDITS_BATCH);
federationConfiguration.put(LARGE_MESSAGE_THRESHOLD, AMQP_MIN_LARGE_MESSAGE_SIZE);
federationConfiguration.put(LINK_ATTACH_TIMEOUT, AMQP_LINK_ATTACH_TIMEOUT);
federationConfiguration.put(IGNORE_QUEUE_CONSUMER_FILTERS, AMQP_INGNORE_CONSUMER_FILTERS);
@ -183,6 +186,7 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
final AMQPFederatedBrokerConnectionElement federation = new AMQPFederatedBrokerConnectionElement("myFederation");
federation.addProperty(LINK_ATTACH_TIMEOUT, AMQP_LINK_ATTACH_TIMEOUT);
federation.addProperty(AmqpSupport.TUNNEL_CORE_MESSAGES, Boolean.toString(AMQP_TUNNEL_CORE_MESSAGES));
federation.addProperty(PULL_RECEIVER_BATCH_SIZE, AMQP_PULL_CREDITS_BATCH);
amqpConnection.addElement(federation);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();

View File

@ -41,8 +41,9 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_CLASS_NAME;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_PROPERTIES_MAP;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_PROPERTIES_MAP;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport.DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationQueueConsumer.DEFAULT_PULL_CREDIT_BATCH_SIZE;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConfiguration.DEFAULT_PULL_CREDIT_BATCH_SIZE;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.allOf;
@ -2566,7 +2567,29 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
}
@Test(timeout = 20000)
public void testPullQueueConsumerGrantsCreditOnEmptyQueue() throws Exception {
public void testPullQueueConsumerGrantsDefaultCreditOnEmptyQueue() throws Exception {
doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(0, false, 0, false, DEFAULT_PULL_CREDIT_BATCH_SIZE);
}
@Test(timeout = 20000)
public void testPullQueueConsumerGrantsReceiverConfiguredCreditOnEmptyQueue() throws Exception {
doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(0, false, 10, true, 10);
}
@Test(timeout = 20000)
public void testPullQueueConsumerGrantsFederationConfiguredCreditOnEmptyQueue() throws Exception {
doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(20, true, 0, false, 20);
}
@Test(timeout = 20000)
public void testPullQueueConsumerGrantsReceiverConfiguredCreditOverFederationConfiguredOnEmptyQueue() throws Exception {
doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(20, true, 10, true, 10);
}
private void doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(int globalBatch, boolean setGlobal,
int receiverBatch, boolean setReceiver,
int expected) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
@ -2586,14 +2609,20 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement();
receiveFromQueue.setName("queue-policy");
receiveFromQueue.addToIncludes("test", "test");
receiveFromQueue.addProperty(RECEIVER_CREDITS, 0);
if (setReceiver) {
receiveFromQueue.addProperty(PULL_RECEIVER_BATCH_SIZE, receiverBatch);
}
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
element.addLocalQueuePolicy(receiveFromQueue);
if (setGlobal) {
element.addProperty(PULL_RECEIVER_BATCH_SIZE, globalBatch);
}
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(
getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort() + "?amqpCredits=0");
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
@ -2611,7 +2640,7 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
containsString("queue-receiver"),
containsString(server.getNodeID().toString())))
.respondInKind();
peer.expectFlow().withLinkCredit(DEFAULT_PULL_CREDIT_BATCH_SIZE);
peer.expectFlow().withLinkCredit(expected);
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
@ -2706,6 +2735,27 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
@Test(timeout = 30000)
public void testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrain() throws Exception {
doTestPullConsumerCreditTopUpAfterEachBacklogDrain(0, false, 0, false, DEFAULT_PULL_CREDIT_BATCH_SIZE);
}
@Test(timeout = 30000)
public void testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrainFederationConfigured() throws Exception {
doTestPullConsumerCreditTopUpAfterEachBacklogDrain(10, true, 0, false, 10);
}
@Test(timeout = 30000)
public void testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrainPolicyConfigured() throws Exception {
doTestPullConsumerCreditTopUpAfterEachBacklogDrain(0, false, 20, true, 20);
}
@Test(timeout = 30000)
public void testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrainBothConfigured() throws Exception {
doTestPullConsumerCreditTopUpAfterEachBacklogDrain(100, true, 20, true, 20);
}
private void doTestPullConsumerCreditTopUpAfterEachBacklogDrain(int globalBatch, boolean setGlobal,
int receiverBatch, boolean setReceiver,
int expected) throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
@ -2727,10 +2777,16 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement();
receiveFromQueue.setName("queue-policy");
receiveFromQueue.addToIncludes("test", "test");
if (setReceiver) {
receiveFromQueue.addProperty(PULL_RECEIVER_BATCH_SIZE, receiverBatch);
}
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
element.addLocalQueuePolicy(receiveFromQueue);
if (setGlobal) {
element.addProperty(PULL_RECEIVER_BATCH_SIZE, globalBatch);
}
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(
@ -2769,7 +2825,7 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
connection.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withLinkCredit(DEFAULT_PULL_CREDIT_BATCH_SIZE);
peer.expectFlow().withLinkCredit(expected);
// Remove the backlog and credit should be offered to the remote
assertNotNull(consumer.receiveNoWait());
@ -2777,7 +2833,7 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
peer.waitForScriptToComplete(20, TimeUnit.SECONDS);
// Consume all the credit that was presented in the batch
for (int i = 0; i < DEFAULT_PULL_CREDIT_BATCH_SIZE; ++i) {
for (int i = 0; i < expected; ++i) {
peer.expectDisposition().withState().accepted();
peer.remoteTransfer().withBody().withString("test-message")
.also()
@ -2785,19 +2841,19 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
.now();
}
Wait.assertTrue(() -> server.queueQuery(queueName).getMessageCount() == DEFAULT_PULL_CREDIT_BATCH_SIZE, 10_000);
Wait.assertTrue(() -> server.queueQuery(queueName).getMessageCount() == expected, 10_000);
// Consume all the newly received message from the remote except one
// which should leave the queue with a pending message so no credit
// should be offered.
for (int i = 0; i < DEFAULT_PULL_CREDIT_BATCH_SIZE - 1; ++i) {
for (int i = 0; i < expected - 1; ++i) {
assertNotNull(consumer.receiveNoWait());
}
// We should not get a new batch yet as there is still one pending
// message on the local queue we have not consumed.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withLinkCredit(DEFAULT_PULL_CREDIT_BATCH_SIZE);
peer.expectFlow().withLinkCredit(expected);
// Remove the backlog and credit should be offered to the remote again
assertNotNull(consumer.receiveNoWait());