ARTEMIS-4314 support queue federation batchOnCapacity via consumerWindowSize=0

This commit is contained in:
Gary Tully 2023-06-15 17:15:00 +01:00
parent d00b9ad014
commit a8b4ee1992
12 changed files with 346 additions and 11 deletions

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@ -133,6 +134,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
private volatile boolean ackIndividually;
private final ClassLoader contextClassLoader;
private volatile boolean manualFlowManagement;
public ClientConsumerImpl(final ClientSessionInternal session,
final ConsumerContext consumerContext,
@ -406,6 +408,13 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
return receiverThread;
}
@Override
public ClientConsumer setManualFlowMessageHandler(final MessageHandler theHandler) throws ActiveMQException {
checkClosed();
this.handler = theHandler;
this.manualFlowManagement = true;
return this;
}
// Must be synchronized since messages may be arriving while handler is being set and might otherwise end
// up not queueing enough executors - so messages get stranded
@ -849,7 +858,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
}
}
private void resetIfSlowConsumer() {
@Override
public void resetIfSlowConsumer() {
if (clientWindowSize == 0) {
sendCredits(0);
@ -1041,6 +1051,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
* @throws ActiveMQException
*/
private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException {
if (manualFlowManagement) {
return;
}
// Chunk messages will execute the flow control while receiving the chunks
if (message.getFlowControlSize() != 0) {
// on large messages we should discount 1 on the first packets as we need continuity until the last packet

View File

@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.utils.FutureLatch;
public interface ClientConsumerInternal extends ClientConsumer {
@ -76,4 +77,8 @@ public interface ClientConsumerInternal extends ClientConsumer {
ClientSession.QueueQuery getQueueInfo();
long getForceDeliveryCount();
ClientConsumer setManualFlowMessageHandler(MessageHandler theHandler) throws ActiveMQException;
void resetIfSlowConsumer();
}

View File

@ -389,6 +389,8 @@ public interface Queue extends Bindable,CriticalComponent {
boolean hasMatchingConsumer(Message message);
long getPendingMessageCount();
Collection<Consumer> getConsumers();
Map<SimpleString, Consumer> getGroups();

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.federation;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -25,20 +26,23 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.LargeData;
public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener {
@ -60,7 +64,9 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
private ClientSessionFactoryInternal clientSessionFactory;
private ClientSession clientSession;
private ClientConsumer clientConsumer;
private ClientConsumerInternal clientConsumer;
private final AtomicInteger pendingPullCredit = new AtomicInteger();
private QueueHandle queueHandle;
public FederatedQueueConsumerImpl(Federation federation, ActiveMQServer server, Transformer transformer, FederatedConsumerKey key, FederationUpstream upstream, ClientSessionCallback clientSessionCallback) {
this.federation = federation;
@ -135,9 +141,16 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
if (clientSessionCallback != null) {
clientSessionCallback.callback(clientSession);
}
if (clientSession.queueQuery(key.getQueueName()).isExists()) {
this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);
this.clientConsumer.setMessageHandler(this);
ClientSession.QueueQuery queryResult = clientSession.queueQuery(key.getQueueName());
if (queryResult.isExists()) {
this.clientConsumer = (ClientConsumerInternal) clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);
if (this.clientConsumer.getClientWindowSize() == 0) {
this.clientConsumer.setManualFlowMessageHandler(this);
queueHandle = createQueueHandle(server, queryResult);
scheduleCreditOnEmpty(0, queueHandle);
} else {
this.clientConsumer.setMessageHandler(this);
}
} else {
throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
}
@ -155,6 +168,75 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
}
}
interface QueueHandle {
long getMessageCount();
int getCreditWindow();
Executor getExecutor();
}
private QueueHandle createQueueHandle(ActiveMQServer server, ClientSession.QueueQuery queryResult) {
final Queue queue = server.locateQueue(queryResult.getName());
int creditWindow = DEFAULT_CONSUMER_WINDOW_SIZE;
final Integer defaultConsumerWindowSize = queryResult.getDefaultConsumerWindowSize();
if (defaultConsumerWindowSize != null) {
creditWindow = defaultConsumerWindowSize.intValue();
if (creditWindow <= 0) {
creditWindow = DEFAULT_CONSUMER_WINDOW_SIZE;
logger.trace("{} override non positive queue consumerWindowSize with {}.", this, creditWindow);
}
}
final int finalCreditWindow = creditWindow;
return new QueueHandle() {
@Override
public long getMessageCount() {
return queue.getPendingMessageCount();
}
@Override
public int getCreditWindow() {
return finalCreditWindow;
}
@Override
public Executor getExecutor() {
return queue.getExecutor();
}
};
}
private void scheduleCreditOnEmpty(final int delay, final QueueHandle handle) {
scheduledExecutorService.schedule(() -> {
// use queue executor to sync on message count metric
handle.getExecutor().execute(() -> {
if (clientConsumer != null) {
if (0L == handle.getMessageCount()) {
flow(handle.getCreditWindow());
pendingPullCredit.set(handle.getCreditWindow());
} else {
if (0 == delay) {
clientConsumer.resetIfSlowConsumer();
pendingPullCredit.set(0);
}
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle);
}
}
});
}, delay, TimeUnit.SECONDS);
}
private void flow(int creditWindow) {
try {
if (this.clientConsumer != null) {
this.clientConsumer.flowControl(creditWindow, false);
}
} catch (ActiveMQException ignored) {
logger.trace("{} failed to flowControl with credit {}.", this, creditWindow, ignored);
}
}
@Override
public synchronized void close() {
if (started) {
@ -225,6 +307,13 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
}
clientMessage.acknowledge();
if (pendingPullCredit.get() > 0) {
final int delta = ((ClientMessageInternal) clientMessage).getFlowControlSize();
if (pendingPullCredit.addAndGet(-delta) < 0) {
scheduleCreditOnEmpty(0, queueHandle);
}
}
if (server.hasBrokerFederationPlugins()) {
try {
server.callBrokerFederationPlugins(plugin -> plugin.afterFederatedQueueConsumerMessageHandled(this, clientMessage));

View File

@ -1621,7 +1621,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return createdTimestamp;
}
public long getMessageCountForRing() {
@Override
public long getPendingMessageCount() {
return (long) pendingMetrics.getMessageCount();
}
@ -4610,7 +4611,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private void enforceRing(MessageReference refToAck, boolean scheduling, boolean head) {
int adjustment = head ? 1 : 0;
if (getMessageCountForRing() + adjustment > ringSize) {
if (getPendingMessageCount() + adjustment > ringSize) {
refToAck = refToAck == null ? messageReferences.poll() : refToAck;
if (refToAck != null) {

View File

@ -742,6 +742,11 @@ public class RoutingContextTest {
return false;
}
@Override
public long getPendingMessageCount() {
return 0;
}
@Override
public Collection<Consumer> getConsumers() {
return null;

View File

@ -1488,6 +1488,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return false;
}
@Override
public long getPendingMessageCount() {
return 0;
}
@Override
public Collection<Consumer> getConsumers() {
return null;

View File

@ -49,6 +49,10 @@ e.g. as many messages as possible are consumed from the same broker as they were
Here for such a migration with blue/green or canary moving a number of consumers on the same queue, you may want to set the `priority-adjustment` to 0, or even a positive value, so message would actively flow to the federated queue.
* Dual Federation - potential for messages to flip-flop between clusters.
If the backlog on your queues exceeds the available local credit across consumers, any lower priority federation consumer becomes a candidate for dispatch and messages will be federated. Eventually all messages may migrate and the scenario can repeat on the other cluster. Applying a rate limit to the connector url can help mitigate but this could have an adverse effect on migration when there are no local consumers.
To better support this use case, it is possible to configure the consumerWindowSize to zero on the referenced connector URI: ```tcp://<host>:<port>?consumerWindowSize=0```. This will cause the federation consumer to pull messages in batches only when the local queue has excess capacity. This means that federation won't ever drain more messaces than it can handle, such that messages would flip-flop. The batch size is derived from the relevant address settings defaultConsumerWindowSize.
## Configuring Queue Federation
Federation is configured in `broker.xml`.

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.federation;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.Collections;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
public class FederatedQueuePullConsumerTest extends FederatedTestBase {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
getServer(0).getConfiguration().addConnectorConfiguration("server-pull-1", "tcp://localhost:" + 61617 + "?consumerWindowSize=0;ackBatchSize=10");
}
@Override
protected boolean isNetty() {
// such that url params can be used for the server-pull-1 connector, vm urls don't propagate url params!
return true;
}
@Override
protected void configureQueues(ActiveMQServer server) throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false)
.setDefaultConsumerWindowSize(20 * 300));
createSimpleQueue(server, getName());
}
protected ConnectionFactory getCF(int i) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + i);
// pull consumers to allow deterministic message consumption
factory.setConsumerWindowSize(0);
return factory;
}
@Test
public void testFederatedQueuePullFromUpstream() throws Exception {
String queueName = getName();
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server-pull-1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
testFederatedQueuePullFromUpstream(queueName);
}
@Test
public void testMultipleFederatedQueueRemoteConsumersUpstream() throws Exception {
String connector = "server-pull-1";
getServer(0).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
getServer(1).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
getServer(1).createQueue(new QueueConfiguration("Test.Q.1").setRoutingType(RoutingType.ANYCAST));
getServer(1).createQueue(new QueueConfiguration("Test.Q.2").setRoutingType(RoutingType.ANYCAST));
getServer(0).getConfiguration().getFederationConfigurations().add(new FederationConfiguration()
.setName("default")
.addFederationPolicy(new FederationQueuePolicyConfiguration()
.setName("myQueuePolicy")
.addInclude(new FederationQueuePolicyConfiguration.Matcher()
.setQueueMatch("#")
.setAddressMatch("Test.#")))
.addUpstreamConfiguration(new FederationUpstreamConfiguration()
.setName("server1-upstream")
.addPolicyRef("myQueuePolicy")
.setStaticConnectors(Collections.singletonList(connector))));
getServer(0).getFederationManager().deploy();
ConnectionFactory cf1 = getCF(0);
ConnectionFactory cf2 = getCF(0);
ConnectionFactory cf3 = getCF(1);
try (Connection consumer1Connection = cf1.createConnection();
Connection consumer2Connection = cf2.createConnection();
Connection producerConnection = cf3.createConnection()) {
consumer1Connection.start();
Session session1 = consumer1Connection.createSession();
Queue queue1 = session1.createQueue("Test.Q.1");
MessageConsumer consumer1 = session1.createConsumer(queue1);
consumer2Connection.start();
Session session2 = consumer2Connection.createSession();
Queue queue2 = session2.createQueue("Test.Q.2");
MessageConsumer consumer2 = session2.createConsumer(queue2);
Session session3 = producerConnection.createSession();
MessageProducer producer = session3.createProducer(queue2);
producer.send(session3.createTextMessage("hello"));
assertNotNull(consumer2.receive(1000));
consumer1Connection.close();
producer.send(session3.createTextMessage("hello"));
assertNotNull(consumer2.receive(1000));
}
}
private void testFederatedQueuePullFromUpstream(final String queueName) throws Exception {
ConnectionFactory cf1 = getCF(1);
ConnectionFactory cf0 = getCF(0);
try (Connection connection1 = cf1.createConnection();
Connection connection0 = cf0.createConnection()) {
connection1.start();
Session session1 = connection1.createSession();
Queue queue1 = session1.createQueue(queueName);
MessageProducer producer = session1.createProducer(queue1);
producer.send(session1.createTextMessage("1"));
connection0.start();
Session session0 = connection0.createSession();
Queue queue0 = session0.createQueue(queueName);
MessageProducer producer0 = session0.createProducer(queue1);
producer0.send(session0.createTextMessage("0"));
// no consumer upstream, messages locally, no pull
MessageConsumer consumer0 = session0.createConsumer(queue0);
// verify federated
waitForBindings(getServer(1), queueName, true, 1, 1, 2000);
// verify no federation of messages
Wait.assertEquals(1, () -> getMessageCount(getServer(0), queueName), 2000, 100);
Wait.assertEquals(1, () -> getMessageCount(getServer(1), queueName), 2000, 100);
// drain local queue
assertNotNull(consumer0.receive(1000));
// no messages locally, expect message federation now of a batch
// 4s b/c local queue size check is in seconds
assertNotNull(consumer0.receive(4000));
// verify all messages consumed
Wait.assertEquals(0L, () -> getMessageCount(getServer(0), queueName), 2000, 100);
Wait.assertEquals(0L, () -> getMessageCount(getServer(1), queueName), 2000, 100);
assertNull(consumer0.receiveNoWait());
// verify batch end
final int mumMessages = 150;
for (int i = 0; i < mumMessages; i++ ) {
producer.send(session1.createTextMessage("1-" + i));
}
// upstream has most
Wait.assertTrue(() -> getMessageCount(getServer(1), queueName) > 100, 2000, 200);
Wait.assertTrue(() -> getMessageCount(getServer(1), queueName) < mumMessages, 2000, 200);
// only a batch has been federated
Wait.assertTrue(() -> getMessageCount(getServer(0), queueName) > 10, 2000, 100);
Wait.assertTrue(() -> getMessageCount(getServer(0), queueName) < 100, 2000, 100);
// verify all available
for (int i = 0; i < mumMessages; i++ ) {
assertNotNull(consumer0.receive(4000));
}
assertNull(consumer0.receiveNoWait());
consumer0.close();
}
}
}

View File

@ -187,15 +187,15 @@ public class RingQueueTest extends ActiveMQTestBase {
m0.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
producer.send(m0);
Wait.assertTrue(() -> queue.getScheduledCount() == 1);
Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 0);
Wait.assertTrue(() -> ((QueueImpl) queue).getPendingMessageCount() == 0);
time = System.currentTimeMillis();
time += 500;
m0.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
producer.send(m0);
Wait.assertTrue(() -> queue.getScheduledCount() == 2);
Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 0);
Wait.assertTrue(() -> ((QueueImpl) queue).getPendingMessageCount() == 0);
Wait.assertTrue(() -> queue.getMessagesReplaced() == 1);
Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 1);
Wait.assertTrue(() -> ((QueueImpl) queue).getPendingMessageCount() == 1);
}
@Test

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
@ -817,6 +818,15 @@ public class LargeMessageBufferTest extends ActiveMQTestBase {
return 0;
}
@Override
public ClientConsumer setManualFlowMessageHandler(MessageHandler handler) throws ActiveMQException {
return null;
}
@Override
public void resetIfSlowConsumer() {
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal#getNonXAsession()
*/

View File

@ -881,6 +881,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return false;
}
@Override
public long getPendingMessageCount() {
return 0;
}
@Override
public Executor getExecutor() {
// no-op