ARTEMIS-636 Implement AMQP AddressFull BLOCK

This commit is contained in:
Martyn Taylor 2016-07-18 14:08:41 +01:00 committed by Andy Taylor
parent 5dfa1c59fb
commit 4d60ced581
10 changed files with 284 additions and 35 deletions
artemis-protocols
artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug
artemis-proton-plug/src
docs/user-manual/en
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton

View File

@ -20,33 +20,37 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.AMQPSessionContext;
@ -66,7 +70,6 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
private final Connection transportConnection;
private ServerSession serverSession;
private AMQPSessionContext protonSession;
@ -347,13 +350,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
recoverContext();
PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
if (store.isFull() && store.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK) {
ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + message.getAddress());
Rejected rejected = new Rejected();
rejected.setError(ec);
delivery.disposition(rejected);
connection.flush();
}
else {
serverSend(message, delivery, receiver);
}
}
private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
try {
serverSession.send(message, false);
// FIXME Potential race here...
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void done() {
synchronized (connection.getLock()) {
delivery.disposition(Accepted.getInstance());
delivery.settle();
connection.flush();
}
@ -378,6 +396,24 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
return manager.getPubSubPrefix();
}
@Override
public void offerProducerCredit(final String address, final int credits, final int threshold, final Receiver receiver) {
try {
final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
store.checkMemory(new Runnable() {
@Override
public void run() {
if (receiver.getRemoteCredit() < threshold) {
receiver.flow(credits);
}
}
});
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void deleteQueue(String address) throws Exception {
manager.getServer().destroyQueue(new SimpleString(address));

View File

@ -44,6 +44,8 @@ public interface AMQPSessionCallback {
void createDurableQueue(String address, String queueName) throws Exception;
void offerProducerCredit(String address, int credits, int threshold, Receiver receiver);
void deleteQueue(String address) throws Exception;
boolean queueQuery(String queueName) throws Exception;

View File

@ -39,8 +39,8 @@ import org.proton.plug.handler.ProtonHandler;
import org.proton.plug.handler.impl.DefaultEventHandler;
import org.proton.plug.util.ByteUtil;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext {

View File

@ -57,14 +57,13 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
close(false);
}
public void flow(int credits) {
public void flow(int credits, int threshold) {
synchronized (connection.getLock()) {
receiver.flow(credits);
sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
}
connection.flush();
}
public void drain(int credits) {
synchronized (connection.getLock()) {
receiver.drain(credits);

View File

@ -84,4 +84,9 @@ public class ProtonClientReceiverContext extends AbstractProtonReceiverContext i
return queues.poll(time, unit);
}
@Override
public void flow(int credits) {
flow(credits, Integer.MAX_VALUE);
}
}

View File

@ -19,7 +19,6 @@ package org.proton.plug.context.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
@ -39,7 +38,14 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);
private final int numberOfCredits = 100;
/*
The maximum number of credits we will allocate to clients.
This number is also used by the broker when refresh client credits.
*/
private static int maxCreditAllocation = 100;
// Used by the broker to decide when to refresh clients credit. This is not used when client requests credit.
private static int minCreditRefresh = 30;
public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
AbstractConnectionContext connection,
@ -50,6 +56,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
@Override
public void onFlow(int credits, boolean drain) {
flow(Math.min(credits, maxCreditAllocation), maxCreditAllocation);
}
@Override
@ -86,10 +93,10 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorFindingTemporaryQueue(e.getMessage());
}
}
}
flow(numberOfCredits);
flow(maxCreditAllocation, minCreditRefresh);
}
/*
@ -117,12 +124,8 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
receiver.advance();
sessionSPI.serverSend(receiver, delivery, address, delivery.getMessageFormat(), buffer);
delivery.disposition(Accepted.getInstance());
delivery.settle();
if (receiver.getRemoteCredit() < numberOfCredits / 2) {
flow(numberOfCredits);
}
flow(maxCreditAllocation, minCreditRefresh);
}
}
finally {

View File

@ -26,6 +26,7 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.AmqpError;
@ -40,11 +41,10 @@ import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.context.AbstractConnectionContext;
import org.proton.plug.context.AbstractProtonContextSender;
import org.proton.plug.context.AbstractProtonSessionContext;
import org.proton.plug.context.ProtonPlugSender;
import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
import org.proton.plug.context.ProtonPlugSender;
import org.apache.qpid.proton.amqp.messaging.Source;
import static org.proton.plug.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
import static org.proton.plug.AmqpSupport.findFilter;

View File

@ -27,9 +27,9 @@ import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.AMQPSessionContext;
import org.proton.plug.SASLResult;
import org.proton.plug.context.ProtonPlugSender;
import org.proton.plug.context.server.ProtonServerSessionContext;
import org.proton.plug.SASLResult;
import org.proton.plug.util.ProtonServerMessage;
public class MinimalSessionSPI implements AMQPSessionCallback {
@ -75,6 +75,11 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
}
@Override
public void offerProducerCredit(String address, int credits, int threshold, Receiver receiver) {
}
@Override
public void createTemporaryQueue(String address, String queueName) throws Exception {

View File

@ -273,6 +273,28 @@ control.
> a misbehaving client to ignore the flow control credits issued by the broker
> and continue sending with out sufficient credit.
#### Blocking producer window based flow control using AMQP
Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support
flow control. Artemis CORE protocol and AMQP. Both protocols implement flow
control slightly differently and therefore address full BLOCK policy behaves
slightly different for clients uses each protocol respectively.
As explained earlier in this chapter the CORE protocol uses a producer window size
flow control system. Where credits (representing bytes) are allocated to producers,
if a producer wants to send a message it should wait until it has enough bytes available
to send it. AMQP flow control credits are not representative of bytes but instead represent
the number of messages a producer is permitted to send (regardless of size).
BLOCK for AMQP works mostly in the same way as the producer window size mechanism above. Artemis
will issue 100 credits to a client at a time and refresh them when the clients credits reaches 30.
The broker will stop issuing credits once an address is full. However, since AMQP credits represent
whole messages and not bytes, it would be possible for an AMQP client to significantly exceed an
address upper bound should the broker continue accepting messages until the clients credits are exhausted.
For this reason once an address has reached it's upper bound and is blocked (when using AMQP) Artemis
will start rejecting messages until the address becomes unblocked. This should be taken into consideration when writing
application code.
### Rate limited flow control
Apache ActiveMQ Artemis also allows the rate a producer can emit message to be limited,

View File

@ -29,11 +29,15 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.QueueBrowser;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -48,9 +52,17 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Properties;
@ -66,12 +78,21 @@ import org.proton.plug.AMQPClientConnectionContext;
import org.proton.plug.AMQPClientReceiverContext;
import org.proton.plug.AMQPClientSenderContext;
import org.proton.plug.AMQPClientSessionContext;
import org.proton.plug.context.server.ProtonServerReceiverContext;
import org.proton.plug.test.Constants;
import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
@RunWith(Parameterized.class)
public class ProtonTest extends ActiveMQTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672";
private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
private static final String userName = "guest";
private static final String password = "guest";
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "{0}")
@ -106,6 +127,7 @@ public class ProtonTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
disableCheckThread();
server = this.createServer(true, true);
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, "5672");
@ -113,6 +135,12 @@ public class ProtonTest extends ActiveMQTestBase {
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
server.getConfiguration().getAddressesSettings().put("#", addressSettings);
server.start();
server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false);
server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false);
@ -167,7 +195,7 @@ public class ProtonTest extends ActiveMQTestBase {
maxCreditAllocation.setInt(null, 1);
String destinationAddress = address + 1;
AmqpClient client = new AmqpClient(new URI("tcp://localhost:5672"), userName, password);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
AmqpSession session = amqpConnection.createSession();
@ -197,9 +225,158 @@ public class ProtonTest extends ActiveMQTestBase {
message = (TextMessage) cons.receive(5000);
Assert.assertNotNull(message);
}
@Test
public void testResourceLimitExceptionOnAddressFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
fillAddress(address + 1);
}
@Test
public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
String destinationAddress = address + 1;
fillAddress(destinationAddress);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Exception e = null;
try {
Destination d = session.createQueue(destinationAddress);
MessageProducer p = session.createProducer(d);
p.send(session.createBytesMessage());
}
catch (ResourceAllocationException rae) {
e = rae;
}
assertTrue(e instanceof ResourceAllocationException);
assertTrue(e.getMessage().contains("resource-limit-exceeded"));
}
@Test
public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
// Only allow 1 credit to be submitted at a time.
Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
maxCreditAllocation.setAccessible(true);
int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
maxCreditAllocation.setInt(null, 1);
String destinationAddress = address + 1;
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(destinationAddress);
sender.setSendTimeout(1000);
sendUntilFull(sender);
assertTrue(sender.getSender().getCredit() <= 0);
}
finally {
amqpConnection.close();
maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
}
}
@Test
public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
String destinationAddress = address + 1;
int messagesSent = fillAddress(destinationAddress);
AmqpConnection amqpConnection = null;
try {
amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(destinationAddress);
// Wait for a potential flow frame.
Thread.sleep(500);
assertEquals(0, sender.getSender().getCredit());
// Empty Address except for 1 message used later.
AmqpReceiver receiver = session.createReceiver(destinationAddress);
receiver.flow(100);
AmqpMessage m;
for (int i = 0; i < messagesSent - 1; i++) {
m = receiver.receive();
m.accept();
}
// Wait for address to unblock and flow frame to arrive
Thread.sleep(500);
assertTrue(sender.getSender().getCredit() > 0);
assertNotNull(receiver.receive());
}
finally {
amqpConnection.close();
}
}
@Test
public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
fillAddress(address + 1);
AmqpConnection amqpConnection = null;
try {
amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address + 1);
// Wait for a potential flow frame.
Thread.sleep(1000);
assertEquals(0, sender.getSender().getCredit());
}
finally {
amqpConnection.close();
}
}
/**
* Fills an address. Careful when using this method. Only use when rejected messages are switched on.
* @param address
* @return
* @throws Exception
*/
private int fillAddress(String address) throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
try {
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address);
return sendUntilFull(sender);
}
finally {
amqpConnection.close();
}
}
private int sendUntilFull(AmqpSender sender) throws IOException {
AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[50 * 1024];
int sentMessages = 0;
int maxMessages = 50;
Exception e = null;
try {
for (int i = 0; i < maxMessages; i++) {
message.setBytes(payload);
sender.send(message);
sentMessages++;
}
}
catch (IOException ioe) {
e = ioe;
}
assertNotNull(e);
assertTrue(e.getMessage().contains("amqp:resource-limit-exceeded"));
return sentMessages;
}
@Test
public void testReplyTo() throws Throwable {
@ -918,7 +1095,7 @@ public class ProtonTest extends ActiveMQTestBase {
private javax.jms.Connection createConnection() throws JMSException {
Connection connection;
if (protocol == 3) {
factory = new JmsConnectionFactory("amqp://localhost:5672");
factory = new JmsConnectionFactory(amqpConnectionUri);
connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
@ -929,7 +1106,7 @@ public class ProtonTest extends ActiveMQTestBase {
connection.start();
}
else if (protocol == 0) {
factory = new JmsConnectionFactory("guest", "guest", "amqp://localhost:5672");
factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
@ -950,7 +1127,7 @@ public class ProtonTest extends ActiveMQTestBase {
factory = new ActiveMQConnectionFactory();
}
connection = factory.createConnection("guest", "guest");
connection = factory.createConnection(userName, password);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {