diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 00f5e3fc89..ab57fe170c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -20,33 +20,37 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager; import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.apache.activemq.artemis.utils.ByteUtil; +import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SelectorTranslator; +import org.apache.activemq.artemis.utils.SimpleIDGenerator; +import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.ProtonJMessage; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; -import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager; -import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.ServerSession; -import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; -import org.apache.activemq.artemis.utils.ByteUtil; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; -import org.apache.activemq.artemis.utils.UUIDGenerator; import org.proton.plug.AMQPConnectionContext; import org.proton.plug.AMQPSessionCallback; import org.proton.plug.AMQPSessionContext; @@ -66,7 +70,6 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se private final Connection transportConnection; - private ServerSession serverSession; private AMQPSessionContext protonSession; @@ -347,13 +350,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se recoverContext(); + PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress()); + if (store.isFull() && store.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK) { + ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + message.getAddress()); + Rejected rejected = new Rejected(); + rejected.setError(ec); + delivery.disposition(rejected); + connection.flush(); + } + else { + serverSend(message, delivery, receiver); + } + } + + private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception { try { serverSession.send(message, false); - + // FIXME Potential race here... manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() { @Override public void done() { synchronized (connection.getLock()) { + delivery.disposition(Accepted.getInstance()); delivery.settle(); connection.flush(); } @@ -378,6 +396,24 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se return manager.getPubSubPrefix(); } + @Override + public void offerProducerCredit(final String address, final int credits, final int threshold, final Receiver receiver) { + try { + final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address)); + store.checkMemory(new Runnable() { + @Override + public void run() { + if (receiver.getRemoteCredit() < threshold) { + receiver.flow(credits); + } + } + }); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override public void deleteQueue(String address) throws Exception { manager.getServer().destroyQueue(new SimpleString(address)); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java index bb53791270..637b5382da 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java @@ -44,6 +44,8 @@ public interface AMQPSessionCallback { void createDurableQueue(String address, String queueName) throws Exception; + void offerProducerCredit(String address, int credits, int threshold, Receiver receiver); + void deleteQueue(String address) throws Exception; boolean queueQuery(String queueName) throws Exception; diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java index d6269e85e8..fa949d3652 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java @@ -39,8 +39,8 @@ import org.proton.plug.handler.ProtonHandler; import org.proton.plug.handler.impl.DefaultEventHandler; import org.proton.plug.util.ByteUtil; -import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT; import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX; +import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT; import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE; public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext { diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java index 4343b017e2..5a430293cd 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java @@ -57,14 +57,13 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable close(false); } - public void flow(int credits) { + public void flow(int credits, int threshold) { synchronized (connection.getLock()) { - receiver.flow(credits); + sessionSPI.offerProducerCredit(address, credits, threshold, receiver); } connection.flush(); } - public void drain(int credits) { synchronized (connection.getLock()) { receiver.drain(credits); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java index 884af60a38..c06ae58d24 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java @@ -84,4 +84,9 @@ public class ProtonClientReceiverContext extends AbstractProtonReceiverContext i return queues.poll(time, unit); } + @Override + public void flow(int credits) { + flow(credits, Integer.MAX_VALUE); + } + } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java index aa04cefe0b..7d39bb7a1b 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java @@ -19,7 +19,6 @@ package org.proton.plug.context.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; @@ -39,7 +38,14 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class); - private final int numberOfCredits = 100; + /* + The maximum number of credits we will allocate to clients. + This number is also used by the broker when refresh client credits. + */ + private static int maxCreditAllocation = 100; + + // Used by the broker to decide when to refresh clients credit. This is not used when client requests credit. + private static int minCreditRefresh = 30; public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, @@ -50,6 +56,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { @Override public void onFlow(int credits, boolean drain) { + flow(Math.min(credits, maxCreditAllocation), maxCreditAllocation); } @Override @@ -86,10 +93,10 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorFindingTemporaryQueue(e.getMessage()); } + } } - - flow(numberOfCredits); + flow(maxCreditAllocation, minCreditRefresh); } /* @@ -117,12 +124,8 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { receiver.advance(); sessionSPI.serverSend(receiver, delivery, address, delivery.getMessageFormat(), buffer); - delivery.disposition(Accepted.getInstance()); - delivery.settle(); - if (receiver.getRemoteCredit() < numberOfCredits / 2) { - flow(numberOfCredits); - } + flow(maxCreditAllocation, minCreditRefresh); } } finally { diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java index 080408401e..5fd24d9f6d 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java @@ -26,6 +26,7 @@ import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.transport.AmqpError; @@ -40,11 +41,10 @@ import org.proton.plug.AMQPSessionCallback; import org.proton.plug.context.AbstractConnectionContext; import org.proton.plug.context.AbstractProtonContextSender; import org.proton.plug.context.AbstractProtonSessionContext; +import org.proton.plug.context.ProtonPlugSender; import org.proton.plug.exceptions.ActiveMQAMQPException; import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException; import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle; -import org.proton.plug.context.ProtonPlugSender; -import org.apache.qpid.proton.amqp.messaging.Source; import static org.proton.plug.AmqpSupport.JMS_SELECTOR_FILTER_IDS; import static org.proton.plug.AmqpSupport.findFilter; diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java index ebc85f12ca..b917aa6920 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java @@ -27,9 +27,9 @@ import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.ProtonJMessage; import org.proton.plug.AMQPSessionCallback; import org.proton.plug.AMQPSessionContext; +import org.proton.plug.SASLResult; import org.proton.plug.context.ProtonPlugSender; import org.proton.plug.context.server.ProtonServerSessionContext; -import org.proton.plug.SASLResult; import org.proton.plug.util.ProtonServerMessage; public class MinimalSessionSPI implements AMQPSessionCallback { @@ -75,6 +75,11 @@ public class MinimalSessionSPI implements AMQPSessionCallback { } + @Override + public void offerProducerCredit(String address, int credits, int threshold, Receiver receiver) { + + } + @Override public void createTemporaryQueue(String address, String queueName) throws Exception { diff --git a/docs/user-manual/en/flow-control.md b/docs/user-manual/en/flow-control.md index 054bcce2b3..c1b403518f 100644 --- a/docs/user-manual/en/flow-control.md +++ b/docs/user-manual/en/flow-control.md @@ -273,6 +273,28 @@ control. > a misbehaving client to ignore the flow control credits issued by the broker > and continue sending with out sufficient credit. +#### Blocking producer window based flow control using AMQP + +Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support +flow control. Artemis CORE protocol and AMQP. Both protocols implement flow +control slightly differently and therefore address full BLOCK policy behaves +slightly different for clients uses each protocol respectively. + +As explained earlier in this chapter the CORE protocol uses a producer window size +flow control system. Where credits (representing bytes) are allocated to producers, +if a producer wants to send a message it should wait until it has enough bytes available +to send it. AMQP flow control credits are not representative of bytes but instead represent +the number of messages a producer is permitted to send (regardless of size). + +BLOCK for AMQP works mostly in the same way as the producer window size mechanism above. Artemis +will issue 100 credits to a client at a time and refresh them when the clients credits reaches 30. +The broker will stop issuing credits once an address is full. However, since AMQP credits represent +whole messages and not bytes, it would be possible for an AMQP client to significantly exceed an +address upper bound should the broker continue accepting messages until the clients credits are exhausted. +For this reason once an address has reached it's upper bound and is blocked (when using AMQP) Artemis +will start rejecting messages until the address becomes unblocked. This should be taken into consideration when writing +application code. + ### Rate limited flow control Apache ActiveMQ Artemis also allows the rate a producer can emit message to be limited, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 4d41ff5fe0..887427149a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -29,11 +29,15 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.QueueBrowser; +import javax.jms.ResourceAllocationException; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; +import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.Field; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -48,9 +52,17 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ByteUtil; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Properties; @@ -66,12 +78,21 @@ import org.proton.plug.AMQPClientConnectionContext; import org.proton.plug.AMQPClientReceiverContext; import org.proton.plug.AMQPClientSenderContext; import org.proton.plug.AMQPClientSessionContext; +import org.proton.plug.context.server.ProtonServerReceiverContext; import org.proton.plug.test.Constants; import org.proton.plug.test.minimalclient.SimpleAMQPConnector; @RunWith(Parameterized.class) public class ProtonTest extends ActiveMQTestBase { + private static final String amqpConnectionUri = "amqp://localhost:5672"; + + private static final String tcpAmqpConnectionUri = "tcp://localhost:5672"; + + private static final String userName = "guest"; + + private static final String password = "guest"; + // this will ensure that all tests in this class are run twice, // once with "true" passed to the class' constructor and once with "false" @Parameterized.Parameters(name = "{0}") @@ -106,6 +127,7 @@ public class ProtonTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); disableCheckThread(); + server = this.createServer(true, true); HashMap params = new HashMap<>(); params.put(TransportConstants.PORT_PROP_NAME, "5672"); @@ -113,6 +135,12 @@ public class ProtonTest extends ActiveMQTestBase { TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); + + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + addressSettings.setMaxSizeBytes(1 * 1024 * 1024); + server.getConfiguration().getAddressesSettings().put("#", addressSettings); + server.start(); server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false); server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false); @@ -167,7 +195,7 @@ public class ProtonTest extends ActiveMQTestBase { maxCreditAllocation.setInt(null, 1); String destinationAddress = address + 1; - AmqpClient client = new AmqpClient(new URI("tcp://localhost:5672"), userName, password); + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpConnection amqpConnection = client.connect(); try { AmqpSession session = amqpConnection.createSession(); @@ -197,9 +225,158 @@ public class ProtonTest extends ActiveMQTestBase { message = (TextMessage) cons.receive(5000); Assert.assertNotNull(message); - } + @Test + public void testResourceLimitExceptionOnAddressFull() throws Exception { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + fillAddress(address + 1); + } + + @Test + public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + String destinationAddress = address + 1; + fillAddress(destinationAddress); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Exception e = null; + try { + Destination d = session.createQueue(destinationAddress); + MessageProducer p = session.createProducer(d); + p.send(session.createBytesMessage()); + } + catch (ResourceAllocationException rae) { + e = rae; + } + assertTrue(e instanceof ResourceAllocationException); + assertTrue(e.getMessage().contains("resource-limit-exceeded")); + } + + @Test + public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + // Only allow 1 credit to be submitted at a time. + Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation"); + maxCreditAllocation.setAccessible(true); + int originalMaxCreditAllocation = maxCreditAllocation.getInt(null); + maxCreditAllocation.setInt(null, 1); + + String destinationAddress = address + 1; + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpConnection amqpConnection = client.connect(); + try { + AmqpSession session = amqpConnection.createSession(); + AmqpSender sender = session.createSender(destinationAddress); + sender.setSendTimeout(1000); + sendUntilFull(sender); + assertTrue(sender.getSender().getCredit() <= 0); + } + finally { + amqpConnection.close(); + maxCreditAllocation.setInt(null, originalMaxCreditAllocation); + } + } + + @Test + public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + String destinationAddress = address + 1; + int messagesSent = fillAddress(destinationAddress); + + AmqpConnection amqpConnection = null; + try { + amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri)); + AmqpSession session = amqpConnection.createSession(); + AmqpSender sender = session.createSender(destinationAddress); + + // Wait for a potential flow frame. + Thread.sleep(500); + assertEquals(0, sender.getSender().getCredit()); + + // Empty Address except for 1 message used later. + AmqpReceiver receiver = session.createReceiver(destinationAddress); + receiver.flow(100); + + AmqpMessage m; + for (int i = 0; i < messagesSent - 1; i++) { + m = receiver.receive(); + m.accept(); + } + + // Wait for address to unblock and flow frame to arrive + Thread.sleep(500); + assertTrue(sender.getSender().getCredit() > 0); + assertNotNull(receiver.receive()); + } + finally { + amqpConnection.close(); + } + } + + @Test + public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + fillAddress(address + 1); + AmqpConnection amqpConnection = null; + try { + amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri)); + AmqpSession session = amqpConnection.createSession(); + AmqpSender sender = session.createSender(address + 1); + // Wait for a potential flow frame. + Thread.sleep(1000); + assertEquals(0, sender.getSender().getCredit()); + } + finally { + amqpConnection.close(); + } + } + + /** + * Fills an address. Careful when using this method. Only use when rejected messages are switched on. + * @param address + * @return + * @throws Exception + */ + private int fillAddress(String address) throws Exception { + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpConnection amqpConnection = client.connect(); + try { + AmqpSession session = amqpConnection.createSession(); + AmqpSender sender = session.createSender(address); + return sendUntilFull(sender); + } + finally { + amqpConnection.close(); + } + } + + private int sendUntilFull(AmqpSender sender) throws IOException { + AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[50 * 1024]; + + int sentMessages = 0; + int maxMessages = 50; + + Exception e = null; + try { + for (int i = 0; i < maxMessages; i++) { + message.setBytes(payload); + sender.send(message); + sentMessages++; + } + } + catch (IOException ioe) { + e = ioe; + } + + assertNotNull(e); + assertTrue(e.getMessage().contains("amqp:resource-limit-exceeded")); + return sentMessages; + } @Test public void testReplyTo() throws Throwable { @@ -918,7 +1095,7 @@ public class ProtonTest extends ActiveMQTestBase { private javax.jms.Connection createConnection() throws JMSException { Connection connection; if (protocol == 3) { - factory = new JmsConnectionFactory("amqp://localhost:5672"); + factory = new JmsConnectionFactory(amqpConnectionUri); connection = factory.createConnection(); connection.setExceptionListener(new ExceptionListener() { @Override @@ -929,7 +1106,7 @@ public class ProtonTest extends ActiveMQTestBase { connection.start(); } else if (protocol == 0) { - factory = new JmsConnectionFactory("guest", "guest", "amqp://localhost:5672"); + factory = new JmsConnectionFactory(userName, password, amqpConnectionUri); connection = factory.createConnection(); connection.setExceptionListener(new ExceptionListener() { @Override @@ -950,7 +1127,7 @@ public class ProtonTest extends ActiveMQTestBase { factory = new ActiveMQConnectionFactory(); } - connection = factory.createConnection("guest", "guest"); + connection = factory.createConnection(userName, password); connection.setExceptionListener(new ExceptionListener() { @Override public void onException(JMSException exception) {