diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt index 743ba95a4b..5f460c0217 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt @@ -1,3 +1,3 @@ - tcp://${host}:${amqp.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300 + tcp://${host}:${amqp.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 diff --git a/artemis-features/src/main/resources/artemis.xml b/artemis-features/src/main/resources/artemis.xml index 5745f6eee2..3d685a5555 100644 --- a/artemis-features/src/main/resources/artemis.xml +++ b/artemis-features/src/main/resources/artemis.xml @@ -119,7 +119,7 @@ under the License. tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 - tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300 + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 26e188b368..c1a92e0102 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -62,9 +62,9 @@ public class ProtonProtocolManager extends AbstractProtocolManager prefixes = new HashMap<>(); - private int amqpCredits = 100; + private int amqpCredits = AmqpSupport.AMQP_CREDITS_DEFAULT; - private int amqpLowCredits = 30; + private int amqpLowCredits = AmqpSupport.AMQP_LOW_CREDITS_DEFAULT; private int initialRemoteMaxFrameSize = 4 * 1024; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 4788d0d436..886bb2d6e6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -282,7 +282,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH return protocolManager.getAmqpLowCredits(); } else { // this is for tests only... - return 30; + return AmqpSupport.AMQP_LOW_CREDITS_DEFAULT; } } @@ -291,7 +291,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH return protocolManager.getAmqpCredits(); } else { // this is for tests only... - return 100; + return AmqpSupport.AMQP_CREDITS_DEFAULT; } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java index 57714ad0cb..940a746936 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java @@ -28,6 +28,10 @@ import org.apache.qpid.proton.amqp.UnsignedLong; */ public class AmqpSupport { + // Default thresholds/values used for granting credit to producers + public static final int AMQP_CREDITS_DEFAULT = 1000; + public static final int AMQP_LOW_CREDITS_DEFAULT = 300; + // Identification values used to locating JMS selector types. public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L); public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string"); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config-wrong-address.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config-wrong-address.xml index 202dbba691..10459f22a6 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config-wrong-address.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config-wrong-address.xml @@ -133,7 +133,7 @@ under the License. tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 - tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300 + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true @@ -209,4 +209,4 @@ under the License. - \ No newline at end of file + diff --git a/examples/features/standard/broker-plugin/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/broker-plugin/src/main/resources/activemq/server0/broker.xml index 4ac665c1f5..50d5e8dc41 100644 --- a/examples/features/standard/broker-plugin/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/standard/broker-plugin/src/main/resources/activemq/server0/broker.xml @@ -120,7 +120,7 @@ under the License. tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 - tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300 + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 01e22886cf..c77184fead 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -166,6 +166,7 @@ public class AmqpConnection extends AmqpAbstractResource implements } protonTransport.setMaxFrameSize(getMaxFrameSize()); protonTransport.setChannelMax(getChannelMax()); + protonTransport.setEmitFlowEventOnSend(false); protonTransport.bind(getEndpoint()); Sasl sasl = protonTransport.sasl(); if (sasl != null) { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 846739a23d..703d4897c7 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -403,6 +403,14 @@ public class AmqpSender extends AmqpAbstractResource { } } + private void doCreditInspection() { + try { + getStateInspector().inspectCredit(getSender()); + } catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + @Override protected Exception getOpenAbortException() { // Verify the attach response contained a non-null target @@ -479,6 +487,13 @@ public class AmqpSender extends AmqpAbstractResource { } } + @Override + public void processFlowUpdates(AmqpConnection connection) throws IOException { + LOG.trace("Sender {} flow update, credit = {}", getEndpoint().getCredit()); + + doCreditInspection(); + } + @Override public void processDeliveryUpdates(AmqpConnection connection, Delivery updated) throws IOException { List toRemove = new ArrayList<>(); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java index 291c690f1a..7c2fe8a692 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java @@ -80,6 +80,10 @@ public class AmqpValidator { } + public void inspectCredit(Sender sender) { + + } + public boolean isValid() { return this.errorMessage.get() == null; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java index 03db6d788b..d58ccb38e8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java @@ -58,7 +58,7 @@ public class AmqpFlowControlTest extends JMSClientTestSupport { @Override protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { - server.getConfiguration().addAcceptorConfiguration("flow", singleCreditAcceptorURI + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1"); + server.getConfiguration().addAcceptorConfiguration("flow", singleCreditAcceptorURI + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpLowCredits=1"); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java index fe470c6284..e10bc7d2bf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java @@ -18,8 +18,10 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -179,4 +181,62 @@ public class AmqpSenderTest extends AmqpClientTestSupport { sender.close(); connection.close(); } + + @Test(timeout = 60000) + public void testSenderCreditReplenishment() throws Exception { + AtomicInteger counter = new AtomicInteger(); + CountDownLatch initialCredit = new CountDownLatch(1); + CountDownLatch refreshedCredit = new CountDownLatch(1); + + AmqpClient client = createAmqpClient(guestUser, guestPass); + client.setValidator(new AmqpValidator() { + @Override + public void inspectCredit(Sender sender) { + int count = counter.incrementAndGet(); + switch (count) { + case 1: + assertEquals("Unexpected initial credit", AmqpSupport.AMQP_CREDITS_DEFAULT, sender.getCredit()); + initialCredit.countDown(); + break; + case 2: + assertEquals("Unexpected replenished credit", AmqpSupport.AMQP_LOW_CREDITS_DEFAULT + AmqpSupport.AMQP_CREDITS_DEFAULT, sender.getCredit()); + refreshedCredit.countDown(); + break; + default: + throw new IllegalStateException("Unexpected additional flow: " + count); + } + } + }); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getQueueName()); + + // Wait for initial credit to arrive and be checked + assertTrue("Expected credit did not arrive", initialCredit.await(3000, TimeUnit.MILLISECONDS)); + + // Send just enough messages not to cause credit replenishment + final int msgCount = AmqpSupport.AMQP_CREDITS_DEFAULT - AmqpSupport.AMQP_LOW_CREDITS_DEFAULT; + for (int i = 1; i <= msgCount - 1; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message: " + i); + sender.send(message); + } + + // Wait and check more credit hasn't flowed yet + assertFalse("Expected credit not to have been refreshed yet", refreshedCredit.await(50, TimeUnit.MILLISECONDS)); + + // Send a final message needed to provoke the replenishment flow, wait for to arrive + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message: " + msgCount); + sender.send(message); + + assertTrue("Expected credit refresh did not occur", refreshedCredit.await(3000, TimeUnit.MILLISECONDS)); + + connection.close(); + } finally { + connection.getStateInspector().assertValid(); + } + } } \ No newline at end of file diff --git a/tests/smoke-tests/src/main/resources/servers/expire/broker.xml b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml index e1451df910..a481c3f64b 100644 --- a/tests/smoke-tests/src/main/resources/servers/expire/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml @@ -106,7 +106,7 @@ under the License. tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300 - tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpMinCredits=300 + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpLowCredits=300 tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;useKQueue=true diff --git a/tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml b/tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml index c318037b7a..22ca563441 100644 --- a/tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml @@ -117,7 +117,7 @@ under the License. tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 - tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300 + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true @@ -196,4 +196,4 @@ under the License. - \ No newline at end of file +