mirror of https://github.com/apache/activemq.git
AMQ-7065 Update to Qpid JMS v0.37.0
Update to latest client release, adds some tests for split framed message send / receive
This commit is contained in:
parent
5246151288
commit
ac1e709dc4
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.amqp;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -23,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
|
@ -45,6 +47,8 @@ import org.slf4j.LoggerFactory;
|
|||
@RunWith(Parameterized.class)
|
||||
public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
|
||||
|
||||
@Parameters(name="{0}")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
|
@ -55,46 +59,38 @@ public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
|
|||
});
|
||||
}
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) {
|
||||
super(connectorScheme, secure);
|
||||
}
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
|
||||
|
||||
private String createLargeString(int sizeInBytes) {
|
||||
byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
|
||||
StringBuilder builder = new StringBuilder();
|
||||
for (int i = 0; i < sizeInBytes; i++) {
|
||||
builder.append(base[i % base.length]);
|
||||
}
|
||||
|
||||
LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
|
||||
return builder.toString();
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testSendSmallerTextMessage() throws JMSException {
|
||||
doTestSendTextMessageOfGivenSize(1024);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testSendSmallerMessages() throws JMSException {
|
||||
public void testSendSeriesOfSmallerTextMessages() throws JMSException {
|
||||
for (int i = 512; i <= (8 * 1024); i += 512) {
|
||||
doTestSendLargeMessage(i);
|
||||
doTestSendTextMessageOfGivenSize(i);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testSendFixedSizedMessages() throws JMSException {
|
||||
doTestSendLargeMessage(65536);
|
||||
doTestSendLargeMessage(65536 * 2);
|
||||
doTestSendLargeMessage(65536 * 4);
|
||||
public void testSendFixedSizedTextMessages() throws JMSException {
|
||||
doTestSendTextMessageOfGivenSize(65536);
|
||||
doTestSendTextMessageOfGivenSize(65536 * 2);
|
||||
doTestSendTextMessageOfGivenSize(65536 * 4);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testSendHugeMessage() throws JMSException {
|
||||
doTestSendLargeMessage(1024 * 1024 * 10);
|
||||
public void testSendHugeTextMessage() throws JMSException {
|
||||
doTestSendTextMessageOfGivenSize(1024 * 1024 * 5);
|
||||
}
|
||||
|
||||
public void doTestSendLargeMessage(int expectedSize) throws JMSException{
|
||||
public void doTestSendTextMessageOfGivenSize(int expectedSize) throws JMSException{
|
||||
LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
|
||||
String payload = createLargeString(expectedSize);
|
||||
assertEquals(expectedSize, payload.getBytes().length);
|
||||
|
@ -126,4 +122,86 @@ public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
|
|||
assertEquals(payload, receivedText);
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testSendSmallerBytesMessage() throws JMSException {
|
||||
doTestSendBytesMessageOfGivenSize(1024);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testSendSeriesOfSmallerBytesMessages() throws JMSException {
|
||||
for (int i = 512; i <= (8 * 1024); i += 512) {
|
||||
doTestSendBytesMessageOfGivenSize(i);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testSendFixedSizedBytesMessages() throws JMSException {
|
||||
doTestSendBytesMessageOfGivenSize(65536);
|
||||
doTestSendBytesMessageOfGivenSize(65536 * 2);
|
||||
doTestSendBytesMessageOfGivenSize(65536 * 4);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testSendHugeBytesMessage() throws JMSException {
|
||||
doTestSendBytesMessageOfGivenSize(1024 * 1024 * 5);
|
||||
}
|
||||
|
||||
public void doTestSendBytesMessageOfGivenSize(int expectedSize) throws JMSException{
|
||||
LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
|
||||
byte[] payload = createLargeByteArray(expectedSize);
|
||||
assertEquals(expectedSize, payload.length);
|
||||
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerAmqpConnectionURI());
|
||||
long startTime = System.currentTimeMillis();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(testName.getMethodName());
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
BytesMessage message = session.createBytesMessage();
|
||||
message.writeBytes(payload);
|
||||
producer.send(message);
|
||||
long endTime = System.currentTimeMillis();
|
||||
LOG.info("Returned from send after {} ms", endTime - startTime);
|
||||
|
||||
startTime = System.currentTimeMillis();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
connection.start();
|
||||
LOG.info("Calling receive");
|
||||
Message receivedMessage = consumer.receive();
|
||||
assertNotNull(receivedMessage);
|
||||
assertTrue(receivedMessage instanceof BytesMessage);
|
||||
BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage;
|
||||
assertNotNull(receivedMessage);
|
||||
endTime = System.currentTimeMillis();
|
||||
LOG.info("Returned from receive after {} ms", endTime - startTime);
|
||||
byte[] receivedBytes = new byte[(int) receivedBytesMessage.getBodyLength()];
|
||||
receivedBytesMessage.readBytes(receivedBytes);
|
||||
assertEquals(expectedSize, receivedBytes.length);
|
||||
assertArrayEquals(payload, receivedBytes);
|
||||
connection.close();
|
||||
}
|
||||
|
||||
private String createLargeString(int sizeInBytes) {
|
||||
byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
|
||||
StringBuilder builder = new StringBuilder();
|
||||
for (int i = 0; i < sizeInBytes; i++) {
|
||||
builder.append(base[i % base.length]);
|
||||
}
|
||||
|
||||
LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
private byte[] createLargeByteArray(int sizeInBytes) {
|
||||
byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
|
||||
|
||||
byte[] payload = new byte[sizeInBytes];
|
||||
for (int i = 0; i < sizeInBytes; i++) {
|
||||
payload[i] = (base[i % base.length]);
|
||||
}
|
||||
|
||||
LOG.debug("Created byte array with size : " + payload.length + " bytes");
|
||||
return payload;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.transport.amqp.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.BufferOverflowException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
|
@ -457,7 +458,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
|||
try {
|
||||
encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
|
||||
break;
|
||||
} catch (java.nio.BufferOverflowException e) {
|
||||
} catch (BufferOverflowException | IndexOutOfBoundsException e) {
|
||||
encodeBuffer = new byte[encodeBuffer.length * 2];
|
||||
}
|
||||
}
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -105,7 +105,7 @@
|
|||
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
|
||||
<zookeeper-version>3.4.6</zookeeper-version>
|
||||
<qpid-proton-version>0.29.0</qpid-proton-version>
|
||||
<qpid-jms-version>0.36.0</qpid-jms-version>
|
||||
<qpid-jms-version>0.37.0</qpid-jms-version>
|
||||
<qpid-jms-netty-version>4.1.28.Final</qpid-jms-netty-version>
|
||||
<qpid-jms-proton-version>0.29.0</qpid-jms-proton-version>
|
||||
<netty-all-version>4.1.28.Final</netty-all-version>
|
||||
|
|
Loading…
Reference in New Issue