From 85b9ac3cce426fe528a8c19d328ffa03528f7ef5 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 19 Nov 2020 09:14:56 -0500 Subject: [PATCH] NO-JIRA Improving AmqpLargeMessageTest This includes removing a test that was removed by accident on ddd8ed440226fa9099f894fa0dd5c1e03614b7da And improving the test with size parameters. --- .../amqp/AmqpLargeMessageTest.java | 92 ++++++++++++++++--- 1 file changed, 78 insertions(+), 14 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java index 3d708bfdcb..6d8470b6d9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java @@ -16,9 +16,11 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -60,17 +62,33 @@ import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class AmqpLargeMessageTest extends AmqpClientTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(AmqpLargeMessageTest.class); private final Random rand = new Random(System.currentTimeMillis()); - private static final int FRAME_SIZE = 32767; - private static final int PAYLOAD = 110 * 1024; + @Parameterized.Parameter(0) + public int frameSize = 32767; + + @Parameterized.Parameter(1) + public int payload = 110 * 1024; + + @Parameterized.Parameter(2) + public int amqpMinLargeMessageSize = 100 * 1024; + + @Parameterized.Parameters(name = "frameSize={0}, payload={1}, amqpMinLargeMessageSize={2}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {32767, 110 * 1024, 100 * 1024}, {2 * 100 * 1024, 10 * 110 * 1024, 4 * 110 * 1024} + }); + } String testQueueName = "ConnectionFrameSize"; @@ -83,7 +101,8 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { @Override protected void configureAMQPAcceptorParameters(Map params) { - params.put("maxFrameSize", FRAME_SIZE); + params.put("maxFrameSize", frameSize); + params.put("amqpMinLargeMessageSize", amqpMinLargeMessageSize); } @Override @@ -129,7 +148,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { AmqpSession session = connection.createSession(); AmqpSender sender = session.createSender(testQueueName); - AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD); + AmqpMessage message = createAmqpMessage((byte) 'A', payload); message.setApplicationProperty("IntProperty", (Integer) 42); message.setDurable(true); @@ -186,7 +205,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { AmqpSender sender = session.createSender(testQueueName); for (int i = 0; i < nMsgs; ++i) { - AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD); + AmqpMessage message = createAmqpMessage((byte) 'A', payload); message.setApplicationProperty("i", (Integer) i); message.setDurable(true); sender.send(message); @@ -237,7 +256,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { // converters can change this to AmqValue Data data = (Data) wrapped.getBody(); instanceLog.debug("received : message: " + data.getValue().getLength()); - assertEquals(PAYLOAD, data.getValue().getLength()); + assertEquals(payload, data.getValue().getLength()); } message.accept(); } @@ -267,7 +286,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); AmqpSender sender = session.createSender(testQueueName); - AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD); + AmqpMessage message = createAmqpMessage((byte) 'A', payload); message.setApplicationProperty("IntProperty", (Integer) 42); message.setDurable(true); @@ -291,7 +310,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { if (wrapped.getBody() instanceof Data) { Data data = (Data) wrapped.getBody(); instanceLog.debug("received : message: " + data.getValue().getLength()); - assertEquals(PAYLOAD, data.getValue().getLength()); + assertEquals(payload, data.getValue().getLength()); } assertNotNull(message.getWrappedMessage().getMessageAnnotations()); @@ -365,6 +384,51 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { return payload; } + @Test(timeout = 60000) + public void testSendHugeHeader() throws Exception { + doTestSendHugeHeader(payload); + } + + @Test(timeout = 60000) + public void testSendLargeMessageWithHugeHeader() throws Exception { + doTestSendHugeHeader(1024 * 1024); + } + + public void doTestSendHugeHeader(int expectedSize) throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + + connection.connect(); + + final int strLength = 512 * 1024; + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(testQueueName); + + AmqpMessage message = createAmqpMessage((byte) 'A', expectedSize); + StringBuffer buffer = new StringBuffer(); + for (int i = 0; i < strLength; i++) { + buffer.append(" "); + } + message.setApplicationProperty("str", buffer.toString()); + message.setDurable(true); + + try { + sender.send(message); + fail(); + } catch (IOException e) { + Assert.assertTrue(e.getCause() instanceof JMSException); + Assert.assertTrue(e.getMessage().contains("AMQ149005")); + } + + session.close(); + } finally { + connection.close(); + } + } + @Test(timeout = 60000) public void testSendSmallerMessages() throws Exception { for (int i = 512; i <= (8 * 1024); i += 512) { @@ -444,7 +508,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { int numMsgs = 10; int msgSize = 2_000_000; - int maxFrameSize = FRAME_SIZE; // Match the brokers outgoing frame size limit to make window sizing easy + int maxFrameSize = frameSize; // Match the brokers outgoing frame size limit to make window sizing easy int sessionCapacity = 2_500_000; // Restrict session to 1.x messages in flight at once, make it likely send is partial. byte[] payload = createLargePayload(msgSize); @@ -510,8 +574,8 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); int numMsgs = 10; - int maxFrameSize = FRAME_SIZE; // Match the brokers outgoing frame size limit to make window sizing easy - int msgSizeA = FRAME_SIZE * 4; // Bigger multi-frame messages + int maxFrameSize = frameSize; // Match the brokers outgoing frame size limit to make window sizing easy + int msgSizeA = frameSize * 4; // Bigger multi-frame messages int msgSizeB = maxFrameSize / 2; // Smaller single frame messages int sessionCapacity = msgSizeA + maxFrameSize; // Restrict session to 1.X of the larger messages in flight at once, make it likely send is partial. @@ -847,7 +911,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { ObjectMessage msg = session.createObjectMessage(); StringBuilder builder = new StringBuilder(); - for (int i = 0; i < PAYLOAD; ++i) { + for (int i = 0; i < payload; ++i) { builder.append("A"); } @@ -868,7 +932,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { TextMessage msg = session.createTextMessage(); StringBuilder builder = new StringBuilder(); - for (int i = 0; i < PAYLOAD; ++i) { + for (int i = 0; i < payload; ++i) { builder.append("A"); } @@ -889,7 +953,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { BytesMessage msg = session.createBytesMessage(); StringBuilder builder = new StringBuilder(); - for (int i = 0; i < PAYLOAD; ++i) { + for (int i = 0; i < payload; ++i) { builder.append("A"); }