From 9a3442f0bd29d77e02eca2f57b4e3adfec5b2f75 Mon Sep 17 00:00:00 2001 From: Robbie Gemmell Date: Fri, 15 Jun 2018 20:27:28 +0100 Subject: [PATCH] ARTEMIS-1934: fix handling/accounting of sent amqp connection data updates max frame size tests to verify behaviour seen with standalone brokers rather than non represenative test-only conditions, as well as more closely validate the recieved messages --- .../apache/activemq/artemis/junit/Wait.java | 6 +- .../amqp/proton/handler/ProtonHandler.java | 4 +- .../transport/amqp/client/AmqpConnection.java | 7 +- .../amqp/AmqpMaxFrameSizeTest.java | 236 +++++++++++++++--- 4 files changed, 221 insertions(+), 32 deletions(-) diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java index 679b112dd6..e37539a3a6 100644 --- a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java +++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/Wait.java @@ -65,7 +65,11 @@ public class Wait { } public static void assertEquals(int size, IntCondition condition, long timeout) throws Exception { - boolean result = waitFor(() -> condition.getCount() == size, timeout); + assertEquals(size, condition, timeout, SLEEP_MILLIS); + } + + public static void assertEquals(int size, IntCondition condition, long timeout, long sleepMillis) throws Exception { + boolean result = waitFor(() -> condition.getCount() == size, timeout, sleepMillis); if (!result) { Assert.fail(size + " != " + condition.getCount()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 11a89d9d83..38ca7a7978 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -209,7 +209,8 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { lock.lock(); try { while (true) { - int pending = transport.pending(); + ByteBuffer head = transport.head(); + int pending = head.remaining(); if (pending <= 0) { break; @@ -217,7 +218,6 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { // We allocated a Pooled Direct Buffer, that will be sent down the stream ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(pending); - ByteBuffer head = transport.head(); buffer.writeBytes(head); for (EventHandler handler : handlers) { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index c77184fead..a00f4ae7e9 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -103,6 +103,7 @@ public class AmqpConnection extends AmqpAbstractResource implements private boolean idleProcessingDisabled; private String containerId; private boolean authenticated; + private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private int channelMax = DEFAULT_CHANNEL_MAX; private long connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; @@ -367,7 +368,11 @@ public class AmqpConnection extends AmqpAbstractResource implements * @return the currently set Max Frame Size value. */ public int getMaxFrameSize() { - return DEFAULT_MAX_FRAME_SIZE; + return maxFrameSize; + } + + public void setMaxFrameSize(int maxFrameSize) { + this.maxFrameSize = maxFrameSize; } public int getChannelMax() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java index de3ada48b8..5931ec8396 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java @@ -20,7 +20,10 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -28,22 +31,40 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { - private static final int FRAME_SIZE = 512; + protected static final Logger LOG = LoggerFactory.getLogger(AmqpMaxFrameSizeTest.class); + + private boolean maxFrameSizeConfigSet = false; + private static final int CONFIGURED_FRAME_SIZE = 4321; @Override protected void configureAMQPAcceptorParameters(Map params) { - params.put("maxFrameSize", FRAME_SIZE); + if ("testBrokerAdvertisedConfiguredMaxFrameSize".equals(getTestName())) { + maxFrameSizeConfigSet = true; + params.put("maxFrameSize", CONFIGURED_FRAME_SIZE); + } + } + + @Override + protected void addConfiguration(ActiveMQServer server) { + // Make the journal file size larger than the frame+message sizes used in the tests, + // since it is by default for external brokers and it changes the behaviour. + server.getConfiguration().setJournalFileSize(2 * 1024 * 1024); } @Test(timeout = 60000) - public void testBrokerHonorsSetMaxFrameSize() throws Exception { + public void testBrokerAdvertisedDefaultMaxFrameSize() throws Exception { + assertFalse("maxFrameSize should not be explicitly configured", maxFrameSizeConfigSet); + AmqpClient client = createAmqpClient(); assertNotNull(client); @@ -52,7 +73,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { @Override public void inspectOpenedResource(Connection connection) { int brokerMaxFrameSize = connection.getTransport().getRemoteMaxFrameSize(); - if (brokerMaxFrameSize != FRAME_SIZE) { + if (brokerMaxFrameSize != AmqpSupport.MAX_FRAME_SIZE_DEFAULT) { markAsInvalid("Broker did not send the expected max Frame Size"); } } @@ -68,42 +89,100 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { } @Test(timeout = 60000) - public void testMultipleTransfers() throws Exception { - server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); - - String testQueueName = "ConnectionFrameSize"; - int nMsgs = 200; + public void testBrokerAdvertisedConfiguredMaxFrameSize() throws Exception { + assertTrue("maxFrameSize should be explicitly configured", maxFrameSizeConfigSet); AmqpClient client = createAmqpClient(); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + int brokerMaxFrameSize = connection.getTransport().getRemoteMaxFrameSize(); + if (brokerMaxFrameSize != CONFIGURED_FRAME_SIZE) { + markAsInvalid("Broker did not send the expected max Frame Size"); + } + } + }); + AmqpConnection connection = addConnection(client.connect()); + try { + assertNotNull(connection); + connection.getStateInspector().assertValid(); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testManyMultiFrameTransfersWithClientMaxFrameSizeSmallerThanBrokers() throws Exception { + final int clientMaxFrameSize = 1024; + final int brokerMaxFrameSize = AmqpSupport.MAX_FRAME_SIZE_DEFAULT; + final int messageSize = 2 * AmqpSupport.MAX_FRAME_SIZE_DEFAULT + 5; + + assertTrue("Client maxFrameSize should be smaller than brokers", clientMaxFrameSize < brokerMaxFrameSize); + + doManyMultiFrameTransfersTestImpl(clientMaxFrameSize, messageSize, brokerMaxFrameSize); + } + + @Test(timeout = 60000) + public void testManyMultiFrameTransfersWithClientMaxFrameSizeLargerThanBrokers() throws Exception { + final int clientMaxFrameSize = 2 * AmqpSupport.MAX_FRAME_SIZE_DEFAULT; + final int brokerMaxFrameSize = AmqpSupport.MAX_FRAME_SIZE_DEFAULT; + final int messageSize = 2 * AmqpSupport.MAX_FRAME_SIZE_DEFAULT + 5; + + assertTrue("Client maxFrameSize should be larger than brokers", clientMaxFrameSize > brokerMaxFrameSize); + + doManyMultiFrameTransfersTestImpl(clientMaxFrameSize, messageSize, brokerMaxFrameSize); + } + + private void doManyMultiFrameTransfersTestImpl(int maxFrameSize, int payloadSize, int brokerMaxFrameSize) throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + int numMsgs = 200; + String testQueueName = getTestName(); + + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + @Override + public void inspectOpenedResource(Connection connection) { + int brokerMaxFrameSize = connection.getTransport().getRemoteMaxFrameSize(); + if (brokerMaxFrameSize != AmqpSupport.MAX_FRAME_SIZE_DEFAULT) { + markAsInvalid("Broker did not send the expected max Frame Size"); + } + } + }); + + AmqpConnection connection = client.createConnection(); + connection.setMaxFrameSize(maxFrameSize); + + connection.connect(); + addConnection(connection); try { - connection.connect(); - AmqpSession session = connection.createSession(); AmqpSender sender = session.createSender(testQueueName); - final int payload = FRAME_SIZE * 16; - - for (int i = 0; i < nMsgs; ++i) { - AmqpMessage message = createAmqpMessage((byte) 'A', payload); + for (int i = 0; i < numMsgs; ++i) { + AmqpMessage message = createAmqpMessage(payloadSize); sender.send(message); } - int count = getMessageCount(server.getPostOffice(), testQueueName); - assertEquals(nMsgs, count); + Wait.assertEquals(numMsgs, () -> getMessageCount(server.getPostOffice(), testQueueName), 5000, 10); AmqpReceiver receiver = session.createReceiver(testQueueName); - receiver.flow(nMsgs); + receiver.flow(numMsgs); - for (int i = 0; i < nMsgs; ++i) { - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull("failed at " + i, message); - MessageImpl wrapped = (MessageImpl) message.getWrappedMessage(); - Data data = (Data) wrapped.getBody(); - System.out.println("received : message: " + data.getValue().getLength()); - assertEquals(payload, data.getValue().getLength()); - message.accept(); + for (int i = 1; i <= numMsgs; ++i) { + AmqpMessage receivedMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Did not recieve message " + i, receivedMessage); + + verifyMessage(receivedMessage, payloadSize); + + LOG.trace("received : message " + i); + receivedMessage.accept(); } } finally { @@ -111,13 +190,114 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { } } - private AmqpMessage createAmqpMessage(byte value, int payloadSize) { + @Test(timeout = 60000) + public void testSingleAndMultiFrameTransferClientMaxFrameSizeSmallerThanBrokers() throws Exception { + final int clientMaxFrameSize = 1024; + final int brokerMaxFrameSize = AmqpSupport.MAX_FRAME_SIZE_DEFAULT; + + assertTrue("Client maxFrameSize should be smaller than brokers", clientMaxFrameSize < brokerMaxFrameSize); + + doSingleAndMultiFrameTransferTestImpl(clientMaxFrameSize, brokerMaxFrameSize); + } + + @Test(timeout = 60000) + public void testSingleAndMultiFrameTransferWithClientMaxFrameSizeLargerThanBrokers() throws Exception { + final int clientMaxFrameSize = 2 * AmqpSupport.MAX_FRAME_SIZE_DEFAULT; + final int brokerMaxFrameSize = AmqpSupport.MAX_FRAME_SIZE_DEFAULT; + + assertTrue("Client maxFrameSize should be larger than brokers", clientMaxFrameSize > brokerMaxFrameSize); + + doSingleAndMultiFrameTransferTestImpl(clientMaxFrameSize, brokerMaxFrameSize); + } + + private void doSingleAndMultiFrameTransferTestImpl(int maxFrameSize, int brokerMaxFrameSize) throws Exception { + final int messageSize1 = 128; + final int messageSize2 = 2 * AmqpSupport.MAX_FRAME_SIZE_DEFAULT + 5; + + assertTrue("messageSize1 should be much smaller than both of the maxFrameSizes", + messageSize1 < maxFrameSize / 2 && messageSize1 < brokerMaxFrameSize / 2); + assertTrue("messageSize2 should be larger than one of the maxFrameSizes", + messageSize2 > maxFrameSize || messageSize2 > brokerMaxFrameSize); + + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + String testQueueName = getTestName(); + + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + @Override + public void inspectOpenedResource(Connection connection) { + int brokerMaxFrameSize = connection.getTransport().getRemoteMaxFrameSize(); + if (brokerMaxFrameSize != AmqpSupport.MAX_FRAME_SIZE_DEFAULT) { + markAsInvalid("Broker did not send the expected max Frame Size"); + } + } + }); + + AmqpConnection connection = client.createConnection(); + connection.setMaxFrameSize(maxFrameSize); + + connection.connect(); + addConnection(connection); + + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(testQueueName); + + AmqpMessage message1 = createAmqpMessage(messageSize1); + AmqpMessage message2 = createAmqpMessage(messageSize2); + sender.send(message1); + sender.send(message2); + + Wait.assertEquals(2, () -> getMessageCount(server.getPostOffice(), testQueueName), 5000, 10); + + AmqpReceiver receiver = session.createReceiver(testQueueName); + receiver.flow(2); + + AmqpMessage receivedMessage1 = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Did not recieve message 1", receivedMessage1); + verifyMessage(receivedMessage1, messageSize1); + receivedMessage1.accept(); + + AmqpMessage receivedMessage2 = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Did not recieve message 2", receivedMessage2); + verifyMessage(receivedMessage2, messageSize2); + + receivedMessage2.accept(); + } finally { + connection.close(); + } + } + + private AmqpMessage createAmqpMessage(final int payloadSize) { AmqpMessage message = new AmqpMessage(); byte[] payload = new byte[payloadSize]; for (int i = 0; i < payload.length; i++) { - payload[i] = value; + // An odd number of digit characters + int offset = i % 7; + payload[i] = (byte) (48 + offset); } message.setBytes(payload); return message; } + + private void verifyMessage(final AmqpMessage message, final int payloadSize) { + MessageImpl wrapped = (MessageImpl) message.getWrappedMessage(); + + assertNotNull("Message has no body", wrapped.getBody()); + assertTrue("Unexpected body type: " + wrapped.getBody().getClass(), wrapped.getBody() instanceof Data); + + Data data = (Data) wrapped.getBody(); + Binary binary = data.getValue(); + assertNotNull("Data section has no content", binary); + assertEquals("Unexpected payload length", payloadSize, binary.getLength()); + + byte[] binaryContent = binary.getArray(); + int offset = binary.getArrayOffset(); + for (int i = 0; i < payloadSize; i++) { + byte expected = (byte) (48 + (i % 7)); + assertEquals("Unexpected content at payload index " + i, expected, binaryContent[i + offset]); + } + } }