ARTEMIS-4003 Fixing credit starve on Large Message over the bridge or clustering

This commit is contained in:
Clebert Suconic 2022-09-19 09:50:16 -04:00 committed by clebertsuconic
parent 33abbbc4b8
commit 7bf1193380
4 changed files with 209 additions and 6 deletions

View File

@ -21,12 +21,15 @@ import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.jboss.logging.Logger;
public abstract class AbstractProducerCreditsImpl implements ClientProducerCredits {
private static final Logger logger = Logger.getLogger(AbstractProducerCreditsImpl.class);
protected int pendingCredits;
private final int windowSize;
protected final int windowSize;
protected volatile boolean closed;
@ -81,7 +84,9 @@ public abstract class AbstractProducerCreditsImpl implements ClientProducerCredi
}
protected void afterAcquired(int credits) throws ActiveMQAddressFullException {
// check to see if the blocking mode is FAIL on the server
if (logger.isDebugEnabled()) {
logger.debugf("AfterAcquire %s credits on address %s", credits, address);
}
synchronized (this) {
pendingCredits -= credits;
}
@ -112,6 +117,7 @@ public abstract class AbstractProducerCreditsImpl implements ClientProducerCredi
@Override
public synchronized void reset() {
logger.debugf("reset credits on address %s", address);
// Any pendingCredits credits from before failover won't arrive, so we re-initialise
int beforeFailure = pendingCredits;
@ -144,6 +150,9 @@ public abstract class AbstractProducerCreditsImpl implements ClientProducerCredi
protected void checkCredits(final int credits) {
int needed = Math.max(credits, windowSize);
if (logger.isTraceEnabled()) {
logger.tracef("CheckCredits %s on address %s, needed=%s, credits=%s, window=%s", credits, address, needed, credits, windowSize);
}
int toRequest = -1;
@ -151,17 +160,32 @@ public abstract class AbstractProducerCreditsImpl implements ClientProducerCredi
if (getBalance() + arriving < needed) {
toRequest = needed - arriving;
pendingCredits += toRequest;
arriving += toRequest;
if (logger.isTraceEnabled()) {
logger.tracef("CheckCredits on Address %s, requesting=%s, arriving=%s, balance=%s", address, toRequest, arriving, getBalance());
}
} else {
if (logger.isTraceEnabled()) {
logger.tracef("CheckCredits did not need it, balance=%s, arriving=%s, needed=%s, getbalance + arriving < needed=%s", getBalance(), arriving, needed, (boolean)(getBalance() + arriving < needed));
}
}
}
if (toRequest != -1) {
if (toRequest > 0) {
if (logger.isDebugEnabled()) {
logger.debugf("Requesting %s credits on address %s, needed = %s, arriving = %s", toRequest, address, needed, arriving);
}
requestCredits(toRequest);
} else {
logger.debugf("not asking for %s credits on %s", toRequest, address);
}
}
private void requestCredits(final int credits) {
protected void requestCredits(final int credits) {
logger.debugf("Request %s credits on address %s", credits, address);
synchronized (this) {
pendingCredits += credits;
arriving += credits;
}
session.sendProducerCreditsMessage(credits, address);
}
}

View File

@ -18,9 +18,12 @@
package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.jboss.logging.Logger;
public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl {
private static final Logger logger = Logger.getLogger(AsynchronousProducerCreditsImpl.class);
int balance;
final ClientProducerFlowCallback callback;
@ -36,6 +39,9 @@ public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl
protected synchronized void actualAcquire(int credits) {
synchronized (this) {
balance -= credits;
if (logger.isDebugEnabled()) {
logger.debugf("actualAcquire on address %s with credits=%s, balance=%s, callbackType=%s", address, credits, balance, callback.getClass());
}
if (balance <= 0) {
callback.onCreditsFlow(true, this);
}
@ -53,7 +59,17 @@ public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl
synchronized (this) {
super.receiveCredits(credits);
balance += credits;
if (logger.isDebugEnabled()) {
logger.debugf("receiveCredits with credits=%s, balance=%s, arriving=%s, callbackType=%s", credits, balance, arriving, callback.getClass());
}
callback.onCreditsFlow(balance <= 0, this);
if (balance < 0 && arriving == 0) {
// there are no more credits arriving and we are still negative, async large message send asked too much and we need to counter balance
logger.debugf("Starve credits counter balance");
int request = -balance + windowSize * 2;
requestCredits(request);
}
}
}
@ -62,6 +78,9 @@ public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl
@Override
public void receiveFailCredits(final int credits) {
super.receiveFailCredits(credits);
if (logger.isDebugEnabled()) {
logger.debugf("creditsFail %s, callback=%s", credits, callback.getClass());
}
callback.onCreditsFail(this);
}
@ -70,6 +89,9 @@ public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl
synchronized (this) {
balance = 0;
callback.onCreditsFlow(true, this);
if (logger.isDebugEnabled()) {
logger.debugf("releaseOutstanding credits, balance=%s, callback=%s", balance, callback.getClass());
}
}
}

View File

@ -1132,6 +1132,63 @@
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-clustered-large-message1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>artemis</user>
<password>artemis</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>true</noWeb>
<instance>${basedir}/target/clusteredLargeMessage/cluster1</instance>
<configuration>${basedir}/target/classes/servers/clusteredLargeMessage/cluster1</configuration>
<args>
<arg>--name</arg>
<arg>cluster1</arg>
<arg>--clustered</arg>
<arg>--staticCluster</arg>
<arg>tcp://localhost:61716</arg>
<arg>--max-hops</arg>
<arg>1</arg>
<arg>--queues</arg>
<arg>testQueue</arg>
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-cluster-large-message2</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>artemis</user>
<password>artemis</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>true</noWeb>
<instance>${basedir}/target/clusteredLargeMessage/cluster2</instance>
<configuration>${basedir}/target/classes/servers/clusteredLargeMessage/cluster2</configuration>
<args>
<arg>--name</arg>
<arg>cluster2</arg>
<arg>--clustered</arg>
<arg>--staticCluster</arg>
<arg>tcp://localhost:61616</arg>
<arg>--max-hops</arg>
<arg>1</arg>
<arg>--port-offset</arg>
<arg>100</arg>
<arg>--queues</arg>
<arg>testQueue</arg>
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-create-nettynative</id>

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.clusteredLargeMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ClusteredLargeMessageTest extends SmokeTestBase {
public static final String SERVER_NAME_0 = "clusteredLargeMessage/cluster1";
public static final String SERVER_NAME_1 = "clusteredLargeMessage/cluster2";
Process server1Process;
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
cleanupData(SERVER_NAME_1);
server1Process = startServer(SERVER_NAME_0, 0, 30000);
startServer(SERVER_NAME_1, 100, 30000);
}
@Test
public void testLargeMessage() throws Exception {
// I'm calling all 3 here as I want to run all of these with a single server start
// without having to deal with beforeClass and afterClass on this test
internalTestLargeMessge("CORE");
internalTestLargeMessge("AMQP");
internalTestLargeMessge("OPENWIRE");
}
private void internalTestLargeMessge(String protocol) throws Exception {
ConnectionFactory server2CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61716");
Connection connection2 = server2CF.createConnection();
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue2 = session2.createQueue("testQueue");
MessageConsumer consumer2 = session2.createConsumer(queue2);
connection2.start();
ConnectionFactory server1CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
Connection connection1 = server1CF.createConnection();
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue1 = session1.createQueue("testQueue");
MessageProducer producer1 = session1.createProducer(queue1);
String largeBody;
{
StringBuffer largeBodyBuffer = new StringBuffer();
while (largeBodyBuffer.length() < 2_000_000) {
largeBodyBuffer.append("This is large ");
}
largeBody = largeBodyBuffer.toString();
}
for (int i = 0; i < 10; i++) {
TextMessage message = session1.createTextMessage(largeBody);
message.setStringProperty("i", Integer.toString(i));
producer1.send(message);
}
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer2.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(largeBody, message.getText());
}
connection1.close();
connection2.close();
}
}