From dc25ff0e42976b4fee507105cd64f0e9847a4ea4 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 27 Mar 2017 15:36:41 -0400 Subject: [PATCH] ARTEMIS-1073 Adding configuration for Producer's credits on AMQP --- .../cli/commands/etc/amqp-acceptor.txt | 2 +- .../artemis/cli/commands/etc/broker.xml | 5 +++- .../amqp/broker/ProtonProtocolManager.java | 24 ++++++++++++++- .../client/AMQPClientConnectionFactory.java | 2 +- .../amqp/proton/AMQPConnectionContext.java | 30 ++++++++++++++++++- .../amqp/proton/AMQPSessionContext.java | 2 +- .../proton/ProtonServerReceiverContext.java | 12 ++++---- .../transaction/ProtonTransactionHandler.java | 10 ++++--- 8 files changed, 72 insertions(+), 15 deletions(-) diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt index 5b20b92a28..71f44b7112 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt @@ -1,3 +1,3 @@ - tcp://${host}:${amqp.port}?protocols=AMQP;useEpoll=true + tcp://${host}:${amqp.port}?protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300 diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml index 5ca868780a..497b10dc88 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml @@ -61,10 +61,13 @@ ${ping-config.settings}${journal-buffer.settings}${connector-config.settings} 100Mb + + + - tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true + tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 ${amqp-acceptor}${stomp-acceptor}${hornetq-acceptor}${mqtt-acceptor} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 2828cc18f8..03314b24de 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -56,6 +56,10 @@ public class ProtonProtocolManager implements ProtocolManager, Noti private final Map prefixes = new HashMap<>(); + private int amqpCredits = 100; + + private int amqpLowCredits = 30; + /* * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for * the address. This can be changed on the acceptor. @@ -105,7 +109,7 @@ public class ProtonProtocolManager implements ProtocolManager, Noti } String id = server.getConfiguration().getName(); - AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool()); + AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool()); Executor executor = server.getExecutorFactory().getExecutor(); @@ -137,6 +141,24 @@ public class ProtonProtocolManager implements ProtocolManager, Noti } + public int getAmqpCredits() { + return amqpCredits; + } + + public ProtonProtocolManager setAmqpCredits(int amqpCredits) { + this.amqpCredits = amqpCredits; + return this; + } + + public int getAmqpLowCredits() { + return amqpLowCredits; + } + + public ProtonProtocolManager setAmqpLowCredits(int amqpLowCredits) { + this.amqpLowCredits = amqpLowCredits; + return this; + } + @Override public boolean isProtocol(byte[] array) { return array.length >= 4 && array[0] == (byte) 'A' && array[1] == (byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P'; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java index b8851bb7e0..510fdad73b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java @@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory { Executor executor = server.getExecutorFactory().getExecutor(); - AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool()); + AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool()); eventHandler.ifPresent(amqpConnection::addEventHandler); ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 1c38942238..7994be4b22 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability; @@ -74,13 +75,18 @@ public class AMQPConnectionContext extends ProtonInitializable { protected LocalListener listener = new LocalListener(); - public AMQPConnectionContext(AMQPConnectionCallback connectionSP, + private final ProtonProtocolManager protocolManager; + + public AMQPConnectionContext(ProtonProtocolManager protocolManager, + AMQPConnectionCallback connectionSP, String containerId, int idleTimeout, int maxFrameSize, int channelMax, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { + + this.protocolManager = protocolManager; this.connectionCallback = connectionSP; this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString(); @@ -240,6 +246,28 @@ public class AMQPConnectionContext extends ProtonInitializable { handler.addEventHandler(eventHandler); } + public ProtonProtocolManager getProtocolManager() { + return protocolManager; + } + + public int getAmqpLowCredits() { + if (protocolManager != null) { + return protocolManager.getAmqpLowCredits(); + } else { + // this is for tests only... + return 30; + } + } + + public int getAmqpCredits() { + if (protocolManager != null) { + return protocolManager.getAmqpCredits(); + } else { + // this is for tests only... + return 100; + } + } + // This listener will perform a bunch of things here class LocalListener implements EventHandler { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index 64b25311c3..c2c1f2d060 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -149,7 +149,7 @@ public class AMQPSessionContext extends ProtonInitializable { receiver.setContext(transactionHandler); synchronized (connection.getLock()) { receiver.open(); - receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT); + receiver.flow(connection.getAmqpCredits()); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 596e93ac01..76ad1ace6d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -53,10 +53,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements The maximum number of credits we will allocate to clients. This number is also used by the broker when refresh client credits. */ - private static int maxCreditAllocation = 100; + private final int amqpCredits; // Used by the broker to decide when to refresh clients credit. This is not used when client requests credit. - private static int minCreditRefresh = 30; + private final int minCreditRefresh; private TerminusExpiryPolicy expiryPolicy; public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI, @@ -67,11 +67,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements this.protonSession = protonSession; this.receiver = receiver; this.sessionSPI = sessionSPI; + this.amqpCredits = connection.getAmqpCredits(); + this.minCreditRefresh = connection.getAmqpLowCredits(); } @Override public void onFlow(int credits, boolean drain) { - flow(Math.min(credits, maxCreditAllocation), maxCreditAllocation); + flow(Math.min(credits, amqpCredits), amqpCredits); } @Override @@ -119,7 +121,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } } } - flow(maxCreditAllocation, minCreditRefresh); + flow(amqpCredits, minCreditRefresh); } private RoutingType getRoutingType(Symbol[] symbols) { @@ -173,7 +175,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data); synchronized (connection.getLock()) { - flow(maxCreditAllocation, minCreditRefresh); + flow(amqpCredits, minCreditRefresh); } } catch (Exception e) { log.warn(e.getMessage(), e); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index 12498b0890..a3dae25d0e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -42,8 +42,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class); - public static final int DEFAULT_COORDINATOR_CREDIT = 100; - public static final int CREDIT_LOW_WATERMARK = 30; + private final int amqpCredit; + private final int amqpLowMark; final AMQPSessionCallback sessionSPI; final AMQPConnectionContext connection; @@ -51,6 +51,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) { this.sessionSPI = sessionSPI; this.connection = connection; + this.amqpCredit = connection.getAmqpCredits(); + this.amqpLowMark = connection.getAmqpLowCredits(); } @Override @@ -68,8 +70,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { synchronized (connection.getLock()) { // Replenish coordinator receiver credit on exhaustion so sender can continue // transaction declare and discahrge operations. - if (receiver.getCredit() < CREDIT_LOW_WATERMARK) { - receiver.flow(DEFAULT_COORDINATOR_CREDIT); + if (receiver.getCredit() < amqpLowMark) { + receiver.flow(amqpCredit); } buffer = new byte[delivery.available()];