This closes #663 Add default AMQP flow behaviour and fix proton test

This commit is contained in:
Andy Taylor 2016-07-26 12:29:55 +01:00
commit 6cb681555a
3 changed files with 92 additions and 35 deletions

View File

@ -405,6 +405,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
public void run() { public void run() {
if (receiver.getRemoteCredit() < threshold) { if (receiver.getRemoteCredit() < threshold) {
receiver.flow(credits); receiver.flow(credits);
connection.flush();
} }
} }
}); });

View File

@ -58,11 +58,18 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
} }
public void flow(int credits, int threshold) { public void flow(int credits, int threshold) {
synchronized (connection.getLock()) { // Use the SessionSPI to allocate producer credits, or default, always allocate credit.
if (sessionSPI != null) {
sessionSPI.offerProducerCredit(address, credits, threshold, receiver); sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
} }
else {
synchronized (connection.getLock()) {
receiver.flow(credits);
connection.flush(); connection.flush();
} }
}
}
public void drain(int credits) { public void drain(int credits) {
synchronized (connection.getLock()) { synchronized (connection.getLock()) {

View File

@ -45,7 +45,9 @@ import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -136,9 +138,9 @@ public class ProtonTest extends ActiveMQTestBase {
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
// Default Page
AddressSettings addressSettings = new AddressSettings(); AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
server.getConfiguration().getAddressesSettings().put("#", addressSettings); server.getConfiguration().getAddressesSettings().put("#", addressSettings);
server.start(); server.start();
@ -230,20 +232,25 @@ public class ProtonTest extends ActiveMQTestBase {
@Test @Test
public void testResourceLimitExceptionOnAddressFull() throws Exception { public void testResourceLimitExceptionOnAddressFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
fillAddress(address + 1); fillAddress(address + 1);
} }
@Test @Test
public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception { public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
String destinationAddress = address + 1; setAddressFullBlockPolicy();
fillAddress(destinationAddress);
String destinationAddress = address + 1;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Exception e = null;
try {
Destination d = session.createQueue(destinationAddress); Destination d = session.createQueue(destinationAddress);
MessageProducer p = session.createProducer(d); MessageProducer p = session.createProducer(d);
fillAddress(destinationAddress);
Exception e = null;
try {
p.send(session.createBytesMessage()); p.send(session.createBytesMessage());
} }
catch (ResourceAllocationException rae) { catch (ResourceAllocationException rae) {
@ -256,6 +263,7 @@ public class ProtonTest extends ActiveMQTestBase {
@Test @Test
public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception { public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
// Only allow 1 credit to be submitted at a time. // Only allow 1 credit to be submitted at a time.
Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation"); Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
@ -269,9 +277,13 @@ public class ProtonTest extends ActiveMQTestBase {
try { try {
AmqpSession session = amqpConnection.createSession(); AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(destinationAddress); AmqpSender sender = session.createSender(destinationAddress);
sender.setSendTimeout(1000);
sendUntilFull(sender); // Use blocking send to ensure buffered messages do not interfere with credit.
assertTrue(sender.getSender().getCredit() <= 0); sender.setSendTimeout(-1);
sendUntilFull(sender, destinationAddress);
// This should be -1. A single message is buffered in the client, and 0 credit has been allocated.
assertTrue(sender.getSender().getCredit() == -1);
} }
finally { finally {
amqpConnection.close(); amqpConnection.close();
@ -282,13 +294,14 @@ public class ProtonTest extends ActiveMQTestBase {
@Test @Test
public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception { public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
String destinationAddress = address + 1; String destinationAddress = address + 1;
int messagesSent = fillAddress(destinationAddress); int messagesSent = fillAddress(destinationAddress);
AmqpConnection amqpConnection = null; AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = amqpConnection = client.connect();
try { try {
amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
AmqpSession session = amqpConnection.createSession(); AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(destinationAddress); AmqpSender sender = session.createSender(destinationAddress);
@ -308,7 +321,8 @@ public class ProtonTest extends ActiveMQTestBase {
// Wait for address to unblock and flow frame to arrive // Wait for address to unblock and flow frame to arrive
Thread.sleep(500); Thread.sleep(500);
assertTrue(sender.getSender().getCredit() > 0);
assertTrue(sender.getSender().getCredit() == 0);
assertNotNull(receiver.receive()); assertNotNull(receiver.receive());
} }
finally { finally {
@ -319,11 +333,12 @@ public class ProtonTest extends ActiveMQTestBase {
@Test @Test
public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception { public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
fillAddress(address + 1); fillAddress(address + 1);
AmqpConnection amqpConnection = null; AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = amqpConnection = client.connect();
try { try {
amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
AmqpSession session = amqpConnection.createSession(); AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address + 1); AmqpSender sender = session.createSender(address + 1);
// Wait for a potential flow frame. // Wait for a potential flow frame.
@ -344,38 +359,62 @@ public class ProtonTest extends ActiveMQTestBase {
private int fillAddress(String address) throws Exception { private int fillAddress(String address) throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect(); AmqpConnection amqpConnection = client.connect();
int messagesSent = 0;
Exception exception = null;
try { try {
AmqpSession session = amqpConnection.createSession(); AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address); AmqpSender sender = session.createSender(address);
return sendUntilFull(sender); messagesSent = sendUntilFull(sender, null);
}
catch (Exception e) {
exception = e;
} }
finally { finally {
amqpConnection.close(); amqpConnection.close();
} }
// Should receive a rejected error
assertNotNull(exception);
assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded"));
return messagesSent;
} }
private int sendUntilFull(AmqpSender sender) throws IOException { private int sendUntilFull(final AmqpSender sender, String expectedErrorMessage) throws Exception {
AmqpMessage message = new AmqpMessage(); final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[50 * 1024]; byte[] payload = new byte[50 * 1024];
message.setBytes(payload);
int sentMessages = 0; final int maxMessages = 50;
int maxMessages = 50; final AtomicInteger sentMessages = new AtomicInteger(0);
final Exception[] errors = new Exception[1];
final CountDownLatch timeout = new CountDownLatch(1);
Exception e = null; Runnable sendMessages = new Runnable() {
@Override
public void run() {
try { try {
for (int i = 0; i < maxMessages; i++) { for (int i = 0; i < maxMessages; i++) {
message.setBytes(payload);
sender.send(message); sender.send(message);
sentMessages++; sentMessages.getAndIncrement();
}
timeout.countDown();
}
catch (IOException e) {
errors[0] = e;
} }
} }
catch (IOException ioe) { };
e = ioe;
}
assertNotNull(e); Thread t = new Thread(sendMessages);
assertTrue(e.getMessage().contains("amqp:resource-limit-exceeded")); t.start();
return sentMessages;
timeout.await(5, TimeUnit.SECONDS);
if (errors[0] != null) {
throw errors[0];
}
return sentMessages.get();
} }
@Test @Test
@ -398,7 +437,6 @@ public class ProtonTest extends ActiveMQTestBase {
Destination jmsReplyTo = message.getJMSReplyTo(); Destination jmsReplyTo = message.getJMSReplyTo();
Assert.assertNotNull(jmsReplyTo); Assert.assertNotNull(jmsReplyTo);
Assert.assertNotNull(message); Assert.assertNotNull(message);
} }
@Test @Test
@ -729,10 +767,13 @@ public class ProtonTest extends ActiveMQTestBase {
consumer.close(); consumer.close();
connection.close(); connection.close();
// Wait for Acks to be processed and message removed from queue.
Thread.sleep(500);
Assert.assertEquals(0, getMessageCount(q)); Assert.assertEquals(0, getMessageCount(q));
long taken = (System.currentTimeMillis() - time) / 1000; long taken = (System.currentTimeMillis() - time) / 1000;
System.out.println("taken = " + taken); System.out.println("taken = " + taken);
} }
@Test @Test
@ -1140,6 +1181,14 @@ public class ProtonTest extends ActiveMQTestBase {
return connection; return connection;
} }
private void setAddressFullBlockPolicy() {
// For BLOCK tests
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
}
public static class AnythingSerializable implements Serializable { public static class AnythingSerializable implements Serializable {
private int count; private int count;