diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt
index 5b20b92a28..71f44b7112 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt
@@ -1,3 +1,3 @@
- tcp://${host}:${amqp.port}?protocols=AMQP;useEpoll=true
+ tcp://${host}:${amqp.port}?protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
index 5ca868780a..497b10dc88 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
@@ -61,10 +61,13 @@ ${ping-config.settings}${journal-buffer.settings}${connector-config.settings}
100Mb
+
+
+
- tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true
+ tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300
${amqp-acceptor}${stomp-acceptor}${hornetq-acceptor}${mqtt-acceptor}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 2828cc18f8..03314b24de 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -56,6 +56,10 @@ public class ProtonProtocolManager implements ProtocolManager, Noti
private final Map prefixes = new HashMap<>();
+ private int amqpCredits = 100;
+
+ private int amqpLowCredits = 30;
+
/*
* used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
* the address. This can be changed on the acceptor.
@@ -105,7 +109,7 @@ public class ProtonProtocolManager implements ProtocolManager, Noti
}
String id = server.getConfiguration().getName();
- AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
+ AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
Executor executor = server.getExecutorFactory().getExecutor();
@@ -137,6 +141,24 @@ public class ProtonProtocolManager implements ProtocolManager, Noti
}
+ public int getAmqpCredits() {
+ return amqpCredits;
+ }
+
+ public ProtonProtocolManager setAmqpCredits(int amqpCredits) {
+ this.amqpCredits = amqpCredits;
+ return this;
+ }
+
+ public int getAmqpLowCredits() {
+ return amqpLowCredits;
+ }
+
+ public ProtonProtocolManager setAmqpLowCredits(int amqpLowCredits) {
+ this.amqpLowCredits = amqpLowCredits;
+ return this;
+ }
+
@Override
public boolean isProtocol(byte[] array) {
return array.length >= 4 && array[0] == (byte) 'A' && array[1] == (byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P';
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index b8851bb7e0..510fdad73b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory {
Executor executor = server.getExecutorFactory().getExecutor();
- AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool());
+ AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool());
eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 1c38942238..7994be4b22 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
@@ -74,13 +75,18 @@ public class AMQPConnectionContext extends ProtonInitializable {
protected LocalListener listener = new LocalListener();
- public AMQPConnectionContext(AMQPConnectionCallback connectionSP,
+ private final ProtonProtocolManager protocolManager;
+
+ public AMQPConnectionContext(ProtonProtocolManager protocolManager,
+ AMQPConnectionCallback connectionSP,
String containerId,
int idleTimeout,
int maxFrameSize,
int channelMax,
Executor dispatchExecutor,
ScheduledExecutorService scheduledPool) {
+
+ this.protocolManager = protocolManager;
this.connectionCallback = connectionSP;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
@@ -240,6 +246,28 @@ public class AMQPConnectionContext extends ProtonInitializable {
handler.addEventHandler(eventHandler);
}
+ public ProtonProtocolManager getProtocolManager() {
+ return protocolManager;
+ }
+
+ public int getAmqpLowCredits() {
+ if (protocolManager != null) {
+ return protocolManager.getAmqpLowCredits();
+ } else {
+ // this is for tests only...
+ return 30;
+ }
+ }
+
+ public int getAmqpCredits() {
+ if (protocolManager != null) {
+ return protocolManager.getAmqpCredits();
+ } else {
+ // this is for tests only...
+ return 100;
+ }
+ }
+
// This listener will perform a bunch of things here
class LocalListener implements EventHandler {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index 64b25311c3..c2c1f2d060 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -149,7 +149,7 @@ public class AMQPSessionContext extends ProtonInitializable {
receiver.setContext(transactionHandler);
synchronized (connection.getLock()) {
receiver.open();
- receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
+ receiver.flow(connection.getAmqpCredits());
}
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 596e93ac01..76ad1ace6d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -53,10 +53,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
The maximum number of credits we will allocate to clients.
This number is also used by the broker when refresh client credits.
*/
- private static int maxCreditAllocation = 100;
+ private final int amqpCredits;
// Used by the broker to decide when to refresh clients credit. This is not used when client requests credit.
- private static int minCreditRefresh = 30;
+ private final int minCreditRefresh;
private TerminusExpiryPolicy expiryPolicy;
public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
@@ -67,11 +67,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
this.protonSession = protonSession;
this.receiver = receiver;
this.sessionSPI = sessionSPI;
+ this.amqpCredits = connection.getAmqpCredits();
+ this.minCreditRefresh = connection.getAmqpLowCredits();
}
@Override
public void onFlow(int credits, boolean drain) {
- flow(Math.min(credits, maxCreditAllocation), maxCreditAllocation);
+ flow(Math.min(credits, amqpCredits), amqpCredits);
}
@Override
@@ -119,7 +121,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
}
}
- flow(maxCreditAllocation, minCreditRefresh);
+ flow(amqpCredits, minCreditRefresh);
}
private RoutingType getRoutingType(Symbol[] symbols) {
@@ -173,7 +175,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
synchronized (connection.getLock()) {
- flow(maxCreditAllocation, minCreditRefresh);
+ flow(amqpCredits, minCreditRefresh);
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 12498b0890..a3dae25d0e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -42,8 +42,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
- public static final int DEFAULT_COORDINATOR_CREDIT = 100;
- public static final int CREDIT_LOW_WATERMARK = 30;
+ private final int amqpCredit;
+ private final int amqpLowMark;
final AMQPSessionCallback sessionSPI;
final AMQPConnectionContext connection;
@@ -51,6 +51,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) {
this.sessionSPI = sessionSPI;
this.connection = connection;
+ this.amqpCredit = connection.getAmqpCredits();
+ this.amqpLowMark = connection.getAmqpLowCredits();
}
@Override
@@ -68,8 +70,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
synchronized (connection.getLock()) {
// Replenish coordinator receiver credit on exhaustion so sender can continue
// transaction declare and discahrge operations.
- if (receiver.getCredit() < CREDIT_LOW_WATERMARK) {
- receiver.flow(DEFAULT_COORDINATOR_CREDIT);
+ if (receiver.getCredit() < amqpLowMark) {
+ receiver.flow(amqpCredit);
}
buffer = new byte[delivery.available()];