ARTEMIS-1862: fix 'amqpLowCredits' XML config, update related code defaults

This commit is contained in:
Robbie Gemmell 2018-05-11 21:07:02 +01:00
parent 30cac20902
commit f0bac1bf18
14 changed files with 97 additions and 13 deletions

View File

@ -1,3 +1,3 @@
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://${host}:${amqp.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor> <acceptor name="amqp">tcp://${host}:${amqp.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

View File

@ -119,7 +119,7 @@ under the License.
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor> <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor> <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- STOMP Acceptor. --> <!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor> <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

View File

@ -62,9 +62,9 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>(); private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
private int amqpCredits = 100; private int amqpCredits = AmqpSupport.AMQP_CREDITS_DEFAULT;
private int amqpLowCredits = 30; private int amqpLowCredits = AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
private int initialRemoteMaxFrameSize = 4 * 1024; private int initialRemoteMaxFrameSize = 4 * 1024;

View File

@ -282,7 +282,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return protocolManager.getAmqpLowCredits(); return protocolManager.getAmqpLowCredits();
} else { } else {
// this is for tests only... // this is for tests only...
return 30; return AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
} }
} }
@ -291,7 +291,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return protocolManager.getAmqpCredits(); return protocolManager.getAmqpCredits();
} else { } else {
// this is for tests only... // this is for tests only...
return 100; return AmqpSupport.AMQP_CREDITS_DEFAULT;
} }
} }

View File

@ -28,6 +28,10 @@ import org.apache.qpid.proton.amqp.UnsignedLong;
*/ */
public class AmqpSupport { public class AmqpSupport {
// Default thresholds/values used for granting credit to producers
public static final int AMQP_CREDITS_DEFAULT = 1000;
public static final int AMQP_LOW_CREDITS_DEFAULT = 300;
// Identification values used to locating JMS selector types. // Identification values used to locating JMS selector types.
public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L); public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string"); public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");

View File

@ -133,7 +133,7 @@ under the License.
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor> <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor> <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- STOMP Acceptor. --> <!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor> <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
@ -209,4 +209,4 @@ under the License.
</addresses> </addresses>
</core> </core>
</configuration> </configuration>

View File

@ -120,7 +120,7 @@ under the License.
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor> <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor> <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- STOMP Acceptor. --> <!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor> <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

View File

@ -166,6 +166,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
} }
protonTransport.setMaxFrameSize(getMaxFrameSize()); protonTransport.setMaxFrameSize(getMaxFrameSize());
protonTransport.setChannelMax(getChannelMax()); protonTransport.setChannelMax(getChannelMax());
protonTransport.setEmitFlowEventOnSend(false);
protonTransport.bind(getEndpoint()); protonTransport.bind(getEndpoint());
Sasl sasl = protonTransport.sasl(); Sasl sasl = protonTransport.sasl();
if (sasl != null) { if (sasl != null) {

View File

@ -403,6 +403,14 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
} }
} }
private void doCreditInspection() {
try {
getStateInspector().inspectCredit(getSender());
} catch (Throwable error) {
getStateInspector().markAsInvalid(error.getMessage());
}
}
@Override @Override
protected Exception getOpenAbortException() { protected Exception getOpenAbortException() {
// Verify the attach response contained a non-null target // Verify the attach response contained a non-null target
@ -479,6 +487,13 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
} }
} }
@Override
public void processFlowUpdates(AmqpConnection connection) throws IOException {
LOG.trace("Sender {} flow update, credit = {}", getEndpoint().getCredit());
doCreditInspection();
}
@Override @Override
public void processDeliveryUpdates(AmqpConnection connection, Delivery updated) throws IOException { public void processDeliveryUpdates(AmqpConnection connection, Delivery updated) throws IOException {
List<Delivery> toRemove = new ArrayList<>(); List<Delivery> toRemove = new ArrayList<>();

View File

@ -80,6 +80,10 @@ public class AmqpValidator {
} }
public void inspectCredit(Sender sender) {
}
public boolean isValid() { public boolean isValid() {
return this.errorMessage.get() == null; return this.errorMessage.get() == null;
} }

View File

@ -58,7 +58,7 @@ public class AmqpFlowControlTest extends JMSClientTestSupport {
@Override @Override
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
server.getConfiguration().addAcceptorConfiguration("flow", singleCreditAcceptorURI + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1"); server.getConfiguration().addAcceptorConfiguration("flow", singleCreditAcceptorURI + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpLowCredits=1");
} }
@Override @Override

View File

@ -18,8 +18,10 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -179,4 +181,62 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
sender.close(); sender.close();
connection.close(); connection.close();
} }
@Test(timeout = 60000)
public void testSenderCreditReplenishment() throws Exception {
AtomicInteger counter = new AtomicInteger();
CountDownLatch initialCredit = new CountDownLatch(1);
CountDownLatch refreshedCredit = new CountDownLatch(1);
AmqpClient client = createAmqpClient(guestUser, guestPass);
client.setValidator(new AmqpValidator() {
@Override
public void inspectCredit(Sender sender) {
int count = counter.incrementAndGet();
switch (count) {
case 1:
assertEquals("Unexpected initial credit", AmqpSupport.AMQP_CREDITS_DEFAULT, sender.getCredit());
initialCredit.countDown();
break;
case 2:
assertEquals("Unexpected replenished credit", AmqpSupport.AMQP_LOW_CREDITS_DEFAULT + AmqpSupport.AMQP_CREDITS_DEFAULT, sender.getCredit());
refreshedCredit.countDown();
break;
default:
throw new IllegalStateException("Unexpected additional flow: " + count);
}
}
});
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
// Wait for initial credit to arrive and be checked
assertTrue("Expected credit did not arrive", initialCredit.await(3000, TimeUnit.MILLISECONDS));
// Send just enough messages not to cause credit replenishment
final int msgCount = AmqpSupport.AMQP_CREDITS_DEFAULT - AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
for (int i = 1; i <= msgCount - 1; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message: " + i);
sender.send(message);
}
// Wait and check more credit hasn't flowed yet
assertFalse("Expected credit not to have been refreshed yet", refreshedCredit.await(50, TimeUnit.MILLISECONDS));
// Send a final message needed to provoke the replenishment flow, wait for to arrive
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message: " + msgCount);
sender.send(message);
assertTrue("Expected credit refresh did not occur", refreshedCredit.await(3000, TimeUnit.MILLISECONDS));
connection.close();
} finally {
connection.getStateInspector().assertValid();
}
}
} }

View File

@ -106,7 +106,7 @@ under the License.
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor> <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpMinCredits=300</acceptor> <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- STOMP Acceptor. --> <!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;useKQueue=true</acceptor> <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;useKQueue=true</acceptor>

View File

@ -117,7 +117,7 @@ under the License.
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor> <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor> <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- STOMP Acceptor. --> <!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor> <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
@ -196,4 +196,4 @@ under the License.
</addresses> </addresses>
</core> </core>
</configuration> </configuration>