AMQ-7065 Update to Qpid JMS v0.37.0

Update to latest client release, adds some tests for split framed
message send / receive

(cherry picked from commit ac1e709dc4)
This commit is contained in:
Timothy Bish 2018-10-02 15:20:11 -04:00
parent 7d2ba5741b
commit 4e627d28fe
3 changed files with 104 additions and 25 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -23,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@ -45,6 +47,8 @@ import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
@Parameters(name="{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
@ -55,46 +59,38 @@ public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
});
}
@Rule
public TestName testName = new TestName();
public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) {
super(connectorScheme, secure);
}
@Rule
public TestName testName = new TestName();
protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
private String createLargeString(int sizeInBytes) {
byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
StringBuilder builder = new StringBuilder();
for (int i = 0; i < sizeInBytes; i++) {
builder.append(base[i % base.length]);
}
LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
return builder.toString();
@Test(timeout = 60 * 1000)
public void testSendSmallerTextMessage() throws JMSException {
doTestSendTextMessageOfGivenSize(1024);
}
@Test(timeout = 60 * 1000)
public void testSendSmallerMessages() throws JMSException {
public void testSendSeriesOfSmallerTextMessages() throws JMSException {
for (int i = 512; i <= (8 * 1024); i += 512) {
doTestSendLargeMessage(i);
doTestSendTextMessageOfGivenSize(i);
}
}
@Test(timeout = 60 * 1000)
public void testSendFixedSizedMessages() throws JMSException {
doTestSendLargeMessage(65536);
doTestSendLargeMessage(65536 * 2);
doTestSendLargeMessage(65536 * 4);
public void testSendFixedSizedTextMessages() throws JMSException {
doTestSendTextMessageOfGivenSize(65536);
doTestSendTextMessageOfGivenSize(65536 * 2);
doTestSendTextMessageOfGivenSize(65536 * 4);
}
@Test(timeout = 60 * 1000)
public void testSendHugeMessage() throws JMSException {
doTestSendLargeMessage(1024 * 1024 * 10);
public void testSendHugeTextMessage() throws JMSException {
doTestSendTextMessageOfGivenSize(1024 * 1024 * 5);
}
public void doTestSendLargeMessage(int expectedSize) throws JMSException{
public void doTestSendTextMessageOfGivenSize(int expectedSize) throws JMSException{
LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
String payload = createLargeString(expectedSize);
assertEquals(expectedSize, payload.getBytes().length);
@ -126,4 +122,86 @@ public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
assertEquals(payload, receivedText);
connection.close();
}
@Test(timeout = 60 * 1000)
public void testSendSmallerBytesMessage() throws JMSException {
doTestSendBytesMessageOfGivenSize(1024);
}
@Test(timeout = 60 * 1000)
public void testSendSeriesOfSmallerBytesMessages() throws JMSException {
for (int i = 512; i <= (8 * 1024); i += 512) {
doTestSendBytesMessageOfGivenSize(i);
}
}
@Test(timeout = 60 * 1000)
public void testSendFixedSizedBytesMessages() throws JMSException {
doTestSendBytesMessageOfGivenSize(65536);
doTestSendBytesMessageOfGivenSize(65536 * 2);
doTestSendBytesMessageOfGivenSize(65536 * 4);
}
@Test(timeout = 60 * 1000)
public void testSendHugeBytesMessage() throws JMSException {
doTestSendBytesMessageOfGivenSize(1024 * 1024 * 5);
}
public void doTestSendBytesMessageOfGivenSize(int expectedSize) throws JMSException{
LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
byte[] payload = createLargeByteArray(expectedSize);
assertEquals(expectedSize, payload.length);
Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerAmqpConnectionURI());
long startTime = System.currentTimeMillis();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(testName.getMethodName());
MessageProducer producer = session.createProducer(queue);
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
producer.send(message);
long endTime = System.currentTimeMillis();
LOG.info("Returned from send after {} ms", endTime - startTime);
startTime = System.currentTimeMillis();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
LOG.info("Calling receive");
Message receivedMessage = consumer.receive();
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof BytesMessage);
BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage;
assertNotNull(receivedMessage);
endTime = System.currentTimeMillis();
LOG.info("Returned from receive after {} ms", endTime - startTime);
byte[] receivedBytes = new byte[(int) receivedBytesMessage.getBodyLength()];
receivedBytesMessage.readBytes(receivedBytes);
assertEquals(expectedSize, receivedBytes.length);
assertArrayEquals(payload, receivedBytes);
connection.close();
}
private String createLargeString(int sizeInBytes) {
byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
StringBuilder builder = new StringBuilder();
for (int i = 0; i < sizeInBytes; i++) {
builder.append(base[i % base.length]);
}
LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
return builder.toString();
}
private byte[] createLargeByteArray(int sizeInBytes) {
byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
byte[] payload = new byte[sizeInBytes];
for (int i = 0; i < sizeInBytes; i++) {
payload[i] = (base[i % base.length]);
}
LOG.debug("Created byte array with size : " + payload.length + " bytes");
return payload;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
@ -457,7 +458,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
try {
encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
break;
} catch (java.nio.BufferOverflowException e) {
} catch (BufferOverflowException | IndexOutOfBoundsException e) {
encodeBuffer = new byte[encodeBuffer.length * 2];
}
}

View File

@ -105,7 +105,7 @@
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
<zookeeper-version>3.4.6</zookeeper-version>
<qpid-proton-version>0.29.0</qpid-proton-version>
<qpid-jms-version>0.36.0</qpid-jms-version>
<qpid-jms-version>0.37.0</qpid-jms-version>
<qpid-jms-netty-version>4.1.28.Final</qpid-jms-netty-version>
<qpid-jms-proton-version>0.29.0</qpid-jms-proton-version>
<netty-all-version>4.1.28.Final</netty-all-version>