Add default AMQP flow behaviour and fix proton test

This commit is contained in:
Martyn Taylor 2016-07-26 11:11:33 +01:00
parent 110158bb86
commit b549bb243c
3 changed files with 92 additions and 35 deletions

View File

@ -405,6 +405,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
public void run() {
if (receiver.getRemoteCredit() < threshold) {
receiver.flow(credits);
connection.flush();
}
}
});

View File

@ -58,10 +58,17 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
}
public void flow(int credits, int threshold) {
synchronized (connection.getLock()) {
// Use the SessionSPI to allocate producer credits, or default, always allocate credit.
if (sessionSPI != null) {
sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
}
connection.flush();
else {
synchronized (connection.getLock()) {
receiver.flow(credits);
connection.flush();
}
}
}
public void drain(int credits) {

View File

@ -45,7 +45,9 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -136,9 +138,9 @@ public class ProtonTest extends ActiveMQTestBase {
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
// Default Page
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
server.getConfiguration().getAddressesSettings().put("#", addressSettings);
server.start();
@ -230,20 +232,25 @@ public class ProtonTest extends ActiveMQTestBase {
@Test
public void testResourceLimitExceptionOnAddressFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
fillAddress(address + 1);
}
@Test
public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
String destinationAddress = address + 1;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination d = session.createQueue(destinationAddress);
MessageProducer p = session.createProducer(d);
fillAddress(destinationAddress);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Exception e = null;
try {
Destination d = session.createQueue(destinationAddress);
MessageProducer p = session.createProducer(d);
p.send(session.createBytesMessage());
}
catch (ResourceAllocationException rae) {
@ -256,6 +263,7 @@ public class ProtonTest extends ActiveMQTestBase {
@Test
public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
// Only allow 1 credit to be submitted at a time.
Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
@ -269,9 +277,13 @@ public class ProtonTest extends ActiveMQTestBase {
try {
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(destinationAddress);
sender.setSendTimeout(1000);
sendUntilFull(sender);
assertTrue(sender.getSender().getCredit() <= 0);
// Use blocking send to ensure buffered messages do not interfere with credit.
sender.setSendTimeout(-1);
sendUntilFull(sender, destinationAddress);
// This should be -1. A single message is buffered in the client, and 0 credit has been allocated.
assertTrue(sender.getSender().getCredit() == -1);
}
finally {
amqpConnection.close();
@ -282,13 +294,14 @@ public class ProtonTest extends ActiveMQTestBase {
@Test
public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
String destinationAddress = address + 1;
int messagesSent = fillAddress(destinationAddress);
AmqpConnection amqpConnection = null;
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = amqpConnection = client.connect();
try {
amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(destinationAddress);
@ -308,7 +321,8 @@ public class ProtonTest extends ActiveMQTestBase {
// Wait for address to unblock and flow frame to arrive
Thread.sleep(500);
assertTrue(sender.getSender().getCredit() > 0);
assertTrue(sender.getSender().getCredit() == 0);
assertNotNull(receiver.receive());
}
finally {
@ -319,11 +333,12 @@ public class ProtonTest extends ActiveMQTestBase {
@Test
public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
fillAddress(address + 1);
AmqpConnection amqpConnection = null;
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = amqpConnection = client.connect();
try {
amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address + 1);
// Wait for a potential flow frame.
@ -344,38 +359,62 @@ public class ProtonTest extends ActiveMQTestBase {
private int fillAddress(String address) throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
int messagesSent = 0;
Exception exception = null;
try {
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address);
return sendUntilFull(sender);
messagesSent = sendUntilFull(sender, null);
}
catch (Exception e) {
exception = e;
}
finally {
amqpConnection.close();
}
// Should receive a rejected error
assertNotNull(exception);
assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded"));
return messagesSent;
}
private int sendUntilFull(AmqpSender sender) throws IOException {
AmqpMessage message = new AmqpMessage();
private int sendUntilFull(final AmqpSender sender, String expectedErrorMessage) throws Exception {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[50 * 1024];
message.setBytes(payload);
int sentMessages = 0;
int maxMessages = 50;
final int maxMessages = 50;
final AtomicInteger sentMessages = new AtomicInteger(0);
final Exception[] errors = new Exception[1];
final CountDownLatch timeout = new CountDownLatch(1);
Exception e = null;
try {
for (int i = 0; i < maxMessages; i++) {
message.setBytes(payload);
sender.send(message);
sentMessages++;
Runnable sendMessages = new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < maxMessages; i++) {
sender.send(message);
sentMessages.getAndIncrement();
}
timeout.countDown();
}
catch (IOException e) {
errors[0] = e;
}
}
}
catch (IOException ioe) {
e = ioe;
}
};
assertNotNull(e);
assertTrue(e.getMessage().contains("amqp:resource-limit-exceeded"));
return sentMessages;
Thread t = new Thread(sendMessages);
t.start();
timeout.await(5, TimeUnit.SECONDS);
if (errors[0] != null) {
throw errors[0];
}
return sentMessages.get();
}
@Test
@ -398,7 +437,6 @@ public class ProtonTest extends ActiveMQTestBase {
Destination jmsReplyTo = message.getJMSReplyTo();
Assert.assertNotNull(jmsReplyTo);
Assert.assertNotNull(message);
}
@Test
@ -729,10 +767,13 @@ public class ProtonTest extends ActiveMQTestBase {
consumer.close();
connection.close();
// Wait for Acks to be processed and message removed from queue.
Thread.sleep(500);
Assert.assertEquals(0, getMessageCount(q));
long taken = (System.currentTimeMillis() - time) / 1000;
System.out.println("taken = " + taken);
}
@Test
@ -1140,6 +1181,14 @@ public class ProtonTest extends ActiveMQTestBase {
return connection;
}
private void setAddressFullBlockPolicy() {
// For BLOCK tests
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
}
public static class AnythingSerializable implements Serializable {
private int count;