ARTEMIS-1056 fixing tests

When I added flow control, some tests that were using reflection started to fail.
Also as a precaution I'm using <= on the flow control low credit check
This commit is contained in:
Clebert Suconic 2017-03-28 16:14:21 -04:00 committed by Justin Bertram
parent facc9dbc94
commit 13a272b37b
3 changed files with 30 additions and 25 deletions

View File

@ -451,7 +451,7 @@ public class AMQPSessionCallback implements SessionCallback {
@Override @Override
public void run() { public void run() {
synchronized (connection.getLock()) { synchronized (connection.getLock()) {
if (receiver.getRemoteCredit() < threshold) { if (receiver.getRemoteCredit() <= threshold) {
receiver.flow(credits); receiver.flow(credits);
connection.flush(); connection.flush();
} }

View File

@ -41,7 +41,6 @@ import javax.jms.TopicSession;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.lang.reflect.Field;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -74,7 +73,6 @@ import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFact
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
@ -128,6 +126,7 @@ public class ProtonTest extends ProtonTestBase {
return Arrays.asList(new Object[][]{{"AMQP", 0}, {"AMQP_ANONYMOUS", 3}}); return Arrays.asList(new Object[][]{{"AMQP", 0}, {"AMQP_ANONYMOUS", 3}});
} }
ConnectionFactory factory; ConnectionFactory factory;
private final int protocol; private final int protocol;
@ -146,6 +145,14 @@ public class ProtonTest extends ProtonTestBase {
private final String address; private final String address;
private Connection connection; private Connection connection;
@Override
protected ActiveMQServer createAMQPServer(int port) throws Exception {
ActiveMQServer server = super.createAMQPServer(port);
server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:" + (8 + port) + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1");
return server;
}
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -418,14 +425,9 @@ public class ProtonTest extends ProtonTestBase {
@Test @Test
public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception { public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
// Only allow 1 credit to be submitted at a time.
Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
maxCreditAllocation.setAccessible(true);
int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
maxCreditAllocation.setInt(null, 1);
String destinationAddress = address + 1; String destinationAddress = address + 1;
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password);
AmqpConnection amqpConnection = client.connect(); AmqpConnection amqpConnection = client.connect();
try { try {
AmqpSession session = amqpConnection.createSession(); AmqpSession session = amqpConnection.createSession();
@ -433,7 +435,6 @@ public class ProtonTest extends ProtonTestBase {
assertTrue(sender.getSender().getCredit() == 1); assertTrue(sender.getSender().getCredit() == 1);
} finally { } finally {
amqpConnection.close(); amqpConnection.close();
maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
} }
} }
@ -609,18 +610,13 @@ public class ProtonTest extends ProtonTestBase {
assertTrue(addressSize >= maxSizeBytesRejectThreshold); assertTrue(addressSize >= maxSizeBytesRejectThreshold);
} }
@Test @Test(timeout = 10000)
public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception { public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
setAddressFullBlockPolicy(); setAddressFullBlockPolicy();
// Only allow 1 credit to be submitted at a time.
Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
maxCreditAllocation.setAccessible(true);
int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
maxCreditAllocation.setInt(null, 1);
String destinationAddress = address + 1; String destinationAddress = address + 1;
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password);
AmqpConnection amqpConnection = client.connect(); AmqpConnection amqpConnection = client.connect();
try { try {
AmqpSession session = amqpConnection.createSession(); AmqpSession session = amqpConnection.createSession();
@ -637,7 +633,6 @@ public class ProtonTest extends ProtonTestBase {
assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold); assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold);
} finally { } finally {
amqpConnection.close(); amqpConnection.close();
maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
} }
} }
@ -771,6 +766,7 @@ public class ProtonTest extends ProtonTestBase {
try { try {
for (int i = 0; i < maxMessages; i++) { for (int i = 0; i < maxMessages; i++) {
sender.send(message); sender.send(message);
System.out.println("Sent " + i);
sentMessages.getAndIncrement(); sentMessages.getAndIncrement();
} }
timeout.countDown(); timeout.countDown();
@ -781,13 +777,20 @@ public class ProtonTest extends ProtonTestBase {
}; };
Thread t = new Thread(sendMessages); Thread t = new Thread(sendMessages);
t.start();
timeout.await(5, TimeUnit.SECONDS); try {
t.start();
messagesSent = sentMessages.get(); timeout.await(1, TimeUnit.SECONDS);
if (errors[0] != null) {
throw errors[0]; messagesSent = sentMessages.get();
if (errors[0] != null) {
throw errors[0];
}
} finally {
t.interrupt();
t.join(1000);
Assert.assertFalse(t.isAlive());
} }
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -54,9 +53,12 @@ public class ProtonTestBase extends ActiveMQTestBase {
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP"); params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
HashMap<String, Object> amqpParams = new HashMap<>(); HashMap<String, Object> amqpParams = new HashMap<>();
configureAmqp(amqpParams); configureAmqp(amqpParams);
amqpServer.getConfiguration().getAcceptorConfigurations().clear();
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams); TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
amqpServer.getConfiguration().setAcceptorConfigurations(Collections.singleton(transportConfiguration)); amqpServer.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
amqpServer.getConfiguration().setName(brokerName); amqpServer.getConfiguration().setName(brokerName);
amqpServer.getConfiguration().setJournalDirectory(amqpServer.getConfiguration().getJournalDirectory() + port); amqpServer.getConfiguration().setJournalDirectory(amqpServer.getConfiguration().getJournalDirectory() + port);
amqpServer.getConfiguration().setBindingsDirectory(amqpServer.getConfiguration().getBindingsDirectory() + port); amqpServer.getConfiguration().setBindingsDirectory(amqpServer.getConfiguration().getBindingsDirectory() + port);