ARTEMIS-1073 Adding configuration for Producer's credits on AMQP

This commit is contained in:
Clebert Suconic 2017-03-27 15:36:41 -04:00
parent 2ef0d26015
commit dc25ff0e42
8 changed files with 72 additions and 15 deletions

View File

@ -1,3 +1,3 @@
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://${host}:${amqp.port}?protocols=AMQP;useEpoll=true</acceptor>
<acceptor name="amqp">tcp://${host}:${amqp.port}?protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>

View File

@ -61,10 +61,13 @@ ${ping-config.settings}${journal-buffer.settings}${connector-config.settings}
<global-max-size>100Mb</global-max-size>
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true</acceptor>
<acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
${amqp-acceptor}${stomp-acceptor}${hornetq-acceptor}${mqtt-acceptor}
</acceptors>

View File

@ -56,6 +56,10 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
private int amqpCredits = 100;
private int amqpLowCredits = 30;
/*
* used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
* the address. This can be changed on the acceptor.
@ -105,7 +109,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
}
String id = server.getConfiguration().getName();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
Executor executor = server.getExecutorFactory().getExecutor();
@ -137,6 +141,24 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
}
public int getAmqpCredits() {
return amqpCredits;
}
public ProtonProtocolManager setAmqpCredits(int amqpCredits) {
this.amqpCredits = amqpCredits;
return this;
}
public int getAmqpLowCredits() {
return amqpLowCredits;
}
public ProtonProtocolManager setAmqpLowCredits(int amqpLowCredits) {
this.amqpLowCredits = amqpLowCredits;
return this;
}
@Override
public boolean isProtocol(byte[] array) {
return array.length >= 4 && array[0] == (byte) 'A' && array[1] == (byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P';

View File

@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory {
Executor executor = server.getExecutorFactory().getExecutor();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool());
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool());
eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);

View File

@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
@ -74,13 +75,18 @@ public class AMQPConnectionContext extends ProtonInitializable {
protected LocalListener listener = new LocalListener();
public AMQPConnectionContext(AMQPConnectionCallback connectionSP,
private final ProtonProtocolManager protocolManager;
public AMQPConnectionContext(ProtonProtocolManager protocolManager,
AMQPConnectionCallback connectionSP,
String containerId,
int idleTimeout,
int maxFrameSize,
int channelMax,
Executor dispatchExecutor,
ScheduledExecutorService scheduledPool) {
this.protocolManager = protocolManager;
this.connectionCallback = connectionSP;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
@ -240,6 +246,28 @@ public class AMQPConnectionContext extends ProtonInitializable {
handler.addEventHandler(eventHandler);
}
public ProtonProtocolManager getProtocolManager() {
return protocolManager;
}
public int getAmqpLowCredits() {
if (protocolManager != null) {
return protocolManager.getAmqpLowCredits();
} else {
// this is for tests only...
return 30;
}
}
public int getAmqpCredits() {
if (protocolManager != null) {
return protocolManager.getAmqpCredits();
} else {
// this is for tests only...
return 100;
}
}
// This listener will perform a bunch of things here
class LocalListener implements EventHandler {

View File

@ -149,7 +149,7 @@ public class AMQPSessionContext extends ProtonInitializable {
receiver.setContext(transactionHandler);
synchronized (connection.getLock()) {
receiver.open();
receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
receiver.flow(connection.getAmqpCredits());
}
}

View File

@ -53,10 +53,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
The maximum number of credits we will allocate to clients.
This number is also used by the broker when refresh client credits.
*/
private static int maxCreditAllocation = 100;
private final int amqpCredits;
// Used by the broker to decide when to refresh clients credit. This is not used when client requests credit.
private static int minCreditRefresh = 30;
private final int minCreditRefresh;
private TerminusExpiryPolicy expiryPolicy;
public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
@ -67,11 +67,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
this.protonSession = protonSession;
this.receiver = receiver;
this.sessionSPI = sessionSPI;
this.amqpCredits = connection.getAmqpCredits();
this.minCreditRefresh = connection.getAmqpLowCredits();
}
@Override
public void onFlow(int credits, boolean drain) {
flow(Math.min(credits, maxCreditAllocation), maxCreditAllocation);
flow(Math.min(credits, amqpCredits), amqpCredits);
}
@Override
@ -119,7 +121,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
}
}
flow(maxCreditAllocation, minCreditRefresh);
flow(amqpCredits, minCreditRefresh);
}
private RoutingType getRoutingType(Symbol[] symbols) {
@ -173,7 +175,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
synchronized (connection.getLock()) {
flow(maxCreditAllocation, minCreditRefresh);
flow(amqpCredits, minCreditRefresh);
}
} catch (Exception e) {
log.warn(e.getMessage(), e);

View File

@ -42,8 +42,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
public static final int DEFAULT_COORDINATOR_CREDIT = 100;
public static final int CREDIT_LOW_WATERMARK = 30;
private final int amqpCredit;
private final int amqpLowMark;
final AMQPSessionCallback sessionSPI;
final AMQPConnectionContext connection;
@ -51,6 +51,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) {
this.sessionSPI = sessionSPI;
this.connection = connection;
this.amqpCredit = connection.getAmqpCredits();
this.amqpLowMark = connection.getAmqpLowCredits();
}
@Override
@ -68,8 +70,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
synchronized (connection.getLock()) {
// Replenish coordinator receiver credit on exhaustion so sender can continue
// transaction declare and discahrge operations.
if (receiver.getCredit() < CREDIT_LOW_WATERMARK) {
receiver.flow(DEFAULT_COORDINATOR_CREDIT);
if (receiver.getCredit() < amqpLowMark) {
receiver.flow(amqpCredit);
}
buffer = new byte[delivery.available()];