From 13a272b37b16949d18cc7ef5264196b388aafcf5 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 28 Mar 2017 16:14:21 -0400 Subject: [PATCH] ARTEMIS-1056 fixing tests When I added flow control, some tests that were using reflection started to fail. Also as a precaution I'm using <= on the flow control low credit check --- .../amqp/broker/AMQPSessionCallback.java | 2 +- .../tests/integration/amqp/ProtonTest.java | 47 ++++++++++--------- .../integration/amqp/ProtonTestBase.java | 6 ++- 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 18294e0f4b..58134f511f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -451,7 +451,7 @@ public class AMQPSessionCallback implements SessionCallback { @Override public void run() { synchronized (connection.getLock()) { - if (receiver.getRemoteCredit() < threshold) { + if (receiver.getRemoteCredit() <= threshold) { receiver.flow(credits); connection.flush(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index fb5e90a3ae..199d9c56c8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -41,7 +41,6 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import java.io.IOException; import java.io.Serializable; -import java.lang.reflect.Field; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -74,7 +73,6 @@ import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFact import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; -import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.ByteUtil; @@ -128,6 +126,7 @@ public class ProtonTest extends ProtonTestBase { return Arrays.asList(new Object[][]{{"AMQP", 0}, {"AMQP_ANONYMOUS", 3}}); } + ConnectionFactory factory; private final int protocol; @@ -146,6 +145,14 @@ public class ProtonTest extends ProtonTestBase { private final String address; private Connection connection; + + @Override + protected ActiveMQServer createAMQPServer(int port) throws Exception { + ActiveMQServer server = super.createAMQPServer(port); + server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:" + (8 + port) + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1"); + return server; + } + @Override @Before public void setUp() throws Exception { @@ -418,14 +425,9 @@ public class ProtonTest extends ProtonTestBase { @Test public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception { - // Only allow 1 credit to be submitted at a time. - Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation"); - maxCreditAllocation.setAccessible(true); - int originalMaxCreditAllocation = maxCreditAllocation.getInt(null); - maxCreditAllocation.setInt(null, 1); String destinationAddress = address + 1; - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password); AmqpConnection amqpConnection = client.connect(); try { AmqpSession session = amqpConnection.createSession(); @@ -433,7 +435,6 @@ public class ProtonTest extends ProtonTestBase { assertTrue(sender.getSender().getCredit() == 1); } finally { amqpConnection.close(); - maxCreditAllocation.setInt(null, originalMaxCreditAllocation); } } @@ -609,18 +610,13 @@ public class ProtonTest extends ProtonTestBase { assertTrue(addressSize >= maxSizeBytesRejectThreshold); } - @Test + @Test(timeout = 10000) public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception { setAddressFullBlockPolicy(); - // Only allow 1 credit to be submitted at a time. - Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation"); - maxCreditAllocation.setAccessible(true); - int originalMaxCreditAllocation = maxCreditAllocation.getInt(null); - maxCreditAllocation.setInt(null, 1); String destinationAddress = address + 1; - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password); AmqpConnection amqpConnection = client.connect(); try { AmqpSession session = amqpConnection.createSession(); @@ -637,7 +633,6 @@ public class ProtonTest extends ProtonTestBase { assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold); } finally { amqpConnection.close(); - maxCreditAllocation.setInt(null, originalMaxCreditAllocation); } } @@ -771,6 +766,7 @@ public class ProtonTest extends ProtonTestBase { try { for (int i = 0; i < maxMessages; i++) { sender.send(message); + System.out.println("Sent " + i); sentMessages.getAndIncrement(); } timeout.countDown(); @@ -781,13 +777,20 @@ public class ProtonTest extends ProtonTestBase { }; Thread t = new Thread(sendMessages); - t.start(); - timeout.await(5, TimeUnit.SECONDS); + try { + t.start(); - messagesSent = sentMessages.get(); - if (errors[0] != null) { - throw errors[0]; + timeout.await(1, TimeUnit.SECONDS); + + messagesSent = sentMessages.get(); + if (errors[0] != null) { + throw errors[0]; + } + } finally { + t.interrupt(); + t.join(1000); + Assert.assertFalse(t.isAlive()); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java index 7057b8b3be..1a06c5449d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -54,9 +53,12 @@ public class ProtonTestBase extends ActiveMQTestBase { params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP"); HashMap amqpParams = new HashMap<>(); configureAmqp(amqpParams); + + amqpServer.getConfiguration().getAcceptorConfigurations().clear(); + TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams); - amqpServer.getConfiguration().setAcceptorConfigurations(Collections.singleton(transportConfiguration)); + amqpServer.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); amqpServer.getConfiguration().setName(brokerName); amqpServer.getConfiguration().setJournalDirectory(amqpServer.getConfiguration().getJournalDirectory() + port); amqpServer.getConfiguration().setBindingsDirectory(amqpServer.getConfiguration().getBindingsDirectory() + port);