This commit is contained in:
Martyn Taylor 2017-10-19 13:11:39 +01:00
commit ff68b1641a
3 changed files with 166 additions and 61 deletions

View File

@ -404,16 +404,6 @@ public class CoreAmqpConverter {
return new Binary(data);
}
private static Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException {
Binary result = null;
String text = message.getText();
if (text != null) {
result = new Binary(text.getBytes(StandardCharsets.UTF_8));
}
return result;
}
private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException {
message.getInnerMessage().getBodyBuffer().resetReaderIndex();
int size = message.getInnerMessage().getBodyBuffer().readInt();

View File

@ -20,7 +20,6 @@ import javax.jms.BytesMessage;
import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean;
@ -55,7 +54,7 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess
@Override
public long getBodyLength() throws JMSException {
return message.getEndOfBodyPosition() - CoreMessage.BODY_OFFSET;
return message.getReadOnlyBodyBuffer().readableBytes();
}
@Override

View File

@ -49,6 +49,11 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
final int NUM_MESSAGES = 10;
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Test(timeout = 60000)
public void testAddressControlSendMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
@ -67,7 +72,7 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(address.toString());
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(500);
Message message = consumer.receive(5000);
assertNotNull(message);
byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()];
((BytesMessage)message).readBytes(buffer);
@ -112,12 +117,10 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
}
}
@Test(timeout = 60000)
public void testBytesMessageSendReceive() throws Throwable {
private void testBytesMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
long time = System.currentTimeMillis();
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
byte[] bytes = new byte[0xf + 1];
@ -135,8 +138,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message);
}
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) {
BytesMessage m = (BytesMessage) consumer.receive(5000);
@ -158,11 +162,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
}
@Test(timeout = 60000)
public void testMessageSendReceive() throws Throwable {
public void testBytesMessageSendReceiveFromAMQPToAMQP() throws Throwable {
testBytesMessageSendReceive(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testBytesMessageSendReceiveFromCoreToAMQP() throws Throwable {
testBytesMessageSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testBytesMessageSendReceiveFromAMQPToCore() throws Throwable {
testBytesMessageSendReceive(createConnection(), createCoreConnection());
}
private void testMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
long time = System.currentTimeMillis();
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
byte[] bytes = new byte[0xf + 1];
@ -179,8 +196,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message);
}
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) {
Message m = consumer.receive(5000);
@ -192,11 +210,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
}
@Test(timeout = 60000)
public void testMapMessageSendReceive() throws Throwable {
public void testMessageSendReceiveFromAMQPToAMQP() throws Throwable {
testMessageSendReceive(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testMessageSendReceiveFromCoreToAMQP() throws Throwable {
testMessageSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testMessageSendReceiveFromAMQPToCore() throws Throwable {
testMessageSendReceive(createConnection(), createCoreConnection());
}
private void testMapMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
long time = System.currentTimeMillis();
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
@ -209,8 +240,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message);
}
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) {
MapMessage m = (MapMessage) consumer.receive(5000);
@ -225,11 +257,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
}
@Test(timeout = 60000)
public void testTextMessageSendReceive() throws Throwable {
public void testMapMessageSendReceiveFromAMQPToAMQP() throws Throwable {
testMapMessageSendReceive(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testMapMessageSendReceiveFromCoreToAMQP() throws Throwable {
testMapMessageSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testMapMessageSendReceiveFromAMQPToCore() throws Throwable {
testMapMessageSendReceive(createConnection(), createCoreConnection());
}
private void testTextMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
long time = System.currentTimeMillis();
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
@ -240,8 +285,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message);
}
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) {
TextMessage m = (TextMessage) consumer.receive(5000);
@ -254,9 +300,22 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
}
@Test(timeout = 60000)
public void testStreamMessageSendReceive() throws Throwable {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
public void testTextMessageSendReceiveFromAMQPToAMQP() throws Throwable {
testTextMessageSendReceive(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testTextMessageSendReceiveFromCoreToAMQP() throws Throwable {
testTextMessageSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testTextMessageSendReceiveFromAMQPToCore() throws Throwable {
testTextMessageSendReceive(createConnection(), createCoreConnection());
}
private void testStreamMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
@ -268,8 +327,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message);
}
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) {
StreamMessage m = (StreamMessage) consumer.receive(5000);
@ -282,22 +342,35 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
}
@Test(timeout = 60000)
public void testObjectMessageWithArrayListPayload() throws Throwable {
public void testStreamMessageSendReceiveFromAMQPToAMQP() throws Throwable {
testStreamMessageSendReceive(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testStreamMessageSendReceiveFromCoreToAMQP() throws Throwable {
testStreamMessageSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testStreamMessageSendReceiveFromAMQPToCore() throws Throwable {
testStreamMessageSendReceive(createConnection(), createCoreConnection());
}
private void testObjectMessageWithArrayListPayload(Connection producerConnection, Connection consumerConnection) throws Throwable {
ArrayList<String> payload = new ArrayList<>();
payload.add("aString");
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
ObjectMessage objectMessage = session.createObjectMessage(payload);
producer.send(objectMessage);
session.close();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = session.createConsumer(queue);
connection.start();
session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = session.createQueue(getQueueName());
MessageConsumer cons = session.createConsumer(consumerQueue);
consumerConnection.start();
objectMessage = (ObjectMessage) cons.receive(5000);
assertNotNull(objectMessage);
@ -305,15 +378,28 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
ArrayList<String> received = (ArrayList<String>) objectMessage.getObject();
assertEquals(received.get(0), "aString");
connection.close();
consumerConnection.close();
}
@Test(timeout = 60000)
public void testObjectMessageUsingCustomType() throws Throwable {
public void testObjectMessageWithArrayListPayloadFromAMQPToAMQP() throws Throwable {
testObjectMessageWithArrayListPayload(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testObjectMessageWithArrayListPayloadFromCoreToAMQP() throws Throwable {
testObjectMessageWithArrayListPayload(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testObjectMessageWithArrayListPayloadFromAMQPToCore() throws Throwable {
testObjectMessageWithArrayListPayload(createConnection(), createCoreConnection());
}
private void testObjectMessageUsingCustomType(Connection producerConnection, Connection consumerConnection) throws Throwable {
long time = System.currentTimeMillis();
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
@ -323,8 +409,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message);
}
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) {
ObjectMessage msg = (ObjectMessage) consumer.receive(5000);
@ -338,6 +425,21 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
System.out.println("taken = " + taken);
}
@Test(timeout = 60000)
public void testObjectMessageUsingCustomTypeFromAMQPToAMQP() throws Throwable {
testObjectMessageUsingCustomType(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testObjectMessageUsingCustomTypeFromCoreToAMQP() throws Throwable {
testObjectMessageUsingCustomType(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testObjectMessageUsingCustomTypeFromAMQPToCore() throws Throwable {
testObjectMessageUsingCustomType(createConnection(), createCoreConnection());
}
public static class AnythingSerializable implements Serializable {
private static final long serialVersionUID = 5972085029690947807L;
@ -352,10 +454,8 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
}
}
@Test(timeout = 60000)
public void testPropertiesArePreserved() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
private void testPropertiesArePreserved(Connection producerConnection, Connection consumerConnection) throws Exception {
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
@ -372,9 +472,10 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message);
producer.send(message);
connection.start();
MessageConsumer messageConsumer = session.createConsumer(queue);
consumerConnection.start();
Session consumerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = session.createQueue(getQueueName());
MessageConsumer messageConsumer = consumerSession.createConsumer(consumerQueue);
TextMessage received = (TextMessage) messageConsumer.receive(5000);
Assert.assertNotNull(received);
Assert.assertEquals("msg:0", received.getText());
@ -389,6 +490,21 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
received = (TextMessage) messageConsumer.receive(5000);
Assert.assertNotNull(received);
connection.close();
consumerConnection.close();
}
}
@Test(timeout = 60000)
public void testPropertiesArePreservedFromAMQPToAMQP() throws Throwable {
testPropertiesArePreserved(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testPropertiesArePreservedFromCoreToAMQP() throws Throwable {
testPropertiesArePreserved(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testPropertiesArePreservedFromAMQPToCore() throws Throwable {
testPropertiesArePreserved(createConnection(), createCoreConnection());
}
}