NO-JIRA Improving AmqpLargeMessageTest

This includes removing a test that was removed by accident on ddd8ed4402
And improving the test with size parameters.
This commit is contained in:
Clebert Suconic 2020-11-19 09:14:56 -05:00
parent 8259441725
commit 85b9ac3cce
1 changed files with 78 additions and 14 deletions

View File

@ -16,9 +16,11 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -60,17 +62,33 @@ import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class AmqpLargeMessageTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpLargeMessageTest.class);
private final Random rand = new Random(System.currentTimeMillis());
private static final int FRAME_SIZE = 32767;
private static final int PAYLOAD = 110 * 1024;
@Parameterized.Parameter(0)
public int frameSize = 32767;
@Parameterized.Parameter(1)
public int payload = 110 * 1024;
@Parameterized.Parameter(2)
public int amqpMinLargeMessageSize = 100 * 1024;
@Parameterized.Parameters(name = "frameSize={0}, payload={1}, amqpMinLargeMessageSize={2}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{32767, 110 * 1024, 100 * 1024}, {2 * 100 * 1024, 10 * 110 * 1024, 4 * 110 * 1024}
});
}
String testQueueName = "ConnectionFrameSize";
@ -83,7 +101,8 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
params.put("maxFrameSize", FRAME_SIZE);
params.put("maxFrameSize", frameSize);
params.put("amqpMinLargeMessageSize", amqpMinLargeMessageSize);
}
@Override
@ -129,7 +148,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(testQueueName);
AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
AmqpMessage message = createAmqpMessage((byte) 'A', payload);
message.setApplicationProperty("IntProperty", (Integer) 42);
message.setDurable(true);
@ -186,7 +205,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
AmqpSender sender = session.createSender(testQueueName);
for (int i = 0; i < nMsgs; ++i) {
AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
AmqpMessage message = createAmqpMessage((byte) 'A', payload);
message.setApplicationProperty("i", (Integer) i);
message.setDurable(true);
sender.send(message);
@ -237,7 +256,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
// converters can change this to AmqValue
Data data = (Data) wrapped.getBody();
instanceLog.debug("received : message: " + data.getValue().getLength());
assertEquals(PAYLOAD, data.getValue().getLength());
assertEquals(payload, data.getValue().getLength());
}
message.accept();
}
@ -267,7 +286,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(testQueueName);
AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
AmqpMessage message = createAmqpMessage((byte) 'A', payload);
message.setApplicationProperty("IntProperty", (Integer) 42);
message.setDurable(true);
@ -291,7 +310,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
if (wrapped.getBody() instanceof Data) {
Data data = (Data) wrapped.getBody();
instanceLog.debug("received : message: " + data.getValue().getLength());
assertEquals(PAYLOAD, data.getValue().getLength());
assertEquals(payload, data.getValue().getLength());
}
assertNotNull(message.getWrappedMessage().getMessageAnnotations());
@ -365,6 +384,51 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
return payload;
}
@Test(timeout = 60000)
public void testSendHugeHeader() throws Exception {
doTestSendHugeHeader(payload);
}
@Test(timeout = 60000)
public void testSendLargeMessageWithHugeHeader() throws Exception {
doTestSendHugeHeader(1024 * 1024);
}
public void doTestSendHugeHeader(int expectedSize) throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
connection.connect();
final int strLength = 512 * 1024;
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(testQueueName);
AmqpMessage message = createAmqpMessage((byte) 'A', expectedSize);
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < strLength; i++) {
buffer.append(" ");
}
message.setApplicationProperty("str", buffer.toString());
message.setDurable(true);
try {
sender.send(message);
fail();
} catch (IOException e) {
Assert.assertTrue(e.getCause() instanceof JMSException);
Assert.assertTrue(e.getMessage().contains("AMQ149005"));
}
session.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendSmallerMessages() throws Exception {
for (int i = 512; i <= (8 * 1024); i += 512) {
@ -444,7 +508,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
int numMsgs = 10;
int msgSize = 2_000_000;
int maxFrameSize = FRAME_SIZE; // Match the brokers outgoing frame size limit to make window sizing easy
int maxFrameSize = frameSize; // Match the brokers outgoing frame size limit to make window sizing easy
int sessionCapacity = 2_500_000; // Restrict session to 1.x messages in flight at once, make it likely send is partial.
byte[] payload = createLargePayload(msgSize);
@ -510,8 +574,8 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
int numMsgs = 10;
int maxFrameSize = FRAME_SIZE; // Match the brokers outgoing frame size limit to make window sizing easy
int msgSizeA = FRAME_SIZE * 4; // Bigger multi-frame messages
int maxFrameSize = frameSize; // Match the brokers outgoing frame size limit to make window sizing easy
int msgSizeA = frameSize * 4; // Bigger multi-frame messages
int msgSizeB = maxFrameSize / 2; // Smaller single frame messages
int sessionCapacity = msgSizeA + maxFrameSize; // Restrict session to 1.X of the larger messages in flight at once, make it likely send is partial.
@ -847,7 +911,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
ObjectMessage msg = session.createObjectMessage();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < PAYLOAD; ++i) {
for (int i = 0; i < payload; ++i) {
builder.append("A");
}
@ -868,7 +932,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
TextMessage msg = session.createTextMessage();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < PAYLOAD; ++i) {
for (int i = 0; i < payload; ++i) {
builder.append("A");
}
@ -889,7 +953,7 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
BytesMessage msg = session.createBytesMessage();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < PAYLOAD; ++i) {
for (int i = 0; i < payload; ++i) {
builder.append("A");
}