diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java index 111de8c42f..8939982f06 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -404,16 +404,6 @@ public class CoreAmqpConverter { return new Binary(data); } - private static Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException { - Binary result = null; - String text = message.getText(); - if (text != null) { - result = new Binary(text.getBytes(StandardCharsets.UTF_8)); - } - - return result; - } - private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException { message.getInnerMessage().getBodyBuffer().resetReaderIndex(); int size = message.getInnerMessage().getBodyBuffer().readInt(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java index 8d473a7078..b6a829d78c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java @@ -20,7 +20,6 @@ import javax.jms.BytesMessage; import javax.jms.JMSException; import org.apache.activemq.artemis.api.core.ICoreMessage; -import org.apache.activemq.artemis.core.message.impl.CoreMessage; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean; @@ -55,7 +54,7 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess @Override public long getBodyLength() throws JMSException { - return message.getEndOfBodyPosition() - CoreMessage.BODY_OFFSET; + return message.getReadOnlyBodyBuffer().readableBytes(); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java index 9c7488b878..5154845316 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java @@ -49,6 +49,11 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { final int NUM_MESSAGES = 10; + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + @Test(timeout = 60000) public void testAddressControlSendMessage() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); @@ -67,7 +72,7 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); javax.jms.Queue queue = session.createQueue(address.toString()); MessageConsumer consumer = session.createConsumer(queue); - Message message = consumer.receive(500); + Message message = consumer.receive(5000); assertNotNull(message); byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()]; ((BytesMessage)message).readBytes(buffer); @@ -112,12 +117,10 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } } - @Test(timeout = 60000) - public void testBytesMessageSendReceive() throws Throwable { + private void testBytesMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { long time = System.currentTimeMillis(); - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); byte[] bytes = new byte[0xf + 1]; @@ -135,8 +138,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { BytesMessage m = (BytesMessage) consumer.receive(5000); @@ -158,11 +162,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } @Test(timeout = 60000) - public void testMessageSendReceive() throws Throwable { + public void testBytesMessageSendReceiveFromAMQPToAMQP() throws Throwable { + testBytesMessageSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testBytesMessageSendReceiveFromCoreToAMQP() throws Throwable { + testBytesMessageSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testBytesMessageSendReceiveFromAMQPToCore() throws Throwable { + testBytesMessageSendReceive(createConnection(), createCoreConnection()); + } + + private void testMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { long time = System.currentTimeMillis(); - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); byte[] bytes = new byte[0xf + 1]; @@ -179,8 +196,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { Message m = consumer.receive(5000); @@ -192,11 +210,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } @Test(timeout = 60000) - public void testMapMessageSendReceive() throws Throwable { + public void testMessageSendReceiveFromAMQPToAMQP() throws Throwable { + testMessageSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testMessageSendReceiveFromCoreToAMQP() throws Throwable { + testMessageSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testMessageSendReceiveFromAMQPToCore() throws Throwable { + testMessageSendReceive(createConnection(), createCoreConnection()); + } + + private void testMapMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { long time = System.currentTimeMillis(); - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); @@ -209,8 +240,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { MapMessage m = (MapMessage) consumer.receive(5000); @@ -225,11 +257,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } @Test(timeout = 60000) - public void testTextMessageSendReceive() throws Throwable { + public void testMapMessageSendReceiveFromAMQPToAMQP() throws Throwable { + testMapMessageSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testMapMessageSendReceiveFromCoreToAMQP() throws Throwable { + testMapMessageSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testMapMessageSendReceiveFromAMQPToCore() throws Throwable { + testMapMessageSendReceive(createConnection(), createCoreConnection()); + } + + private void testTextMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { long time = System.currentTimeMillis(); - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); @@ -240,8 +285,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { TextMessage m = (TextMessage) consumer.receive(5000); @@ -254,9 +300,22 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } @Test(timeout = 60000) - public void testStreamMessageSendReceive() throws Throwable { - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + public void testTextMessageSendReceiveFromAMQPToAMQP() throws Throwable { + testTextMessageSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testTextMessageSendReceiveFromCoreToAMQP() throws Throwable { + testTextMessageSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testTextMessageSendReceiveFromAMQPToCore() throws Throwable { + testTextMessageSendReceive(createConnection(), createCoreConnection()); + } + + private void testStreamMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); @@ -268,8 +327,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { StreamMessage m = (StreamMessage) consumer.receive(5000); @@ -282,22 +342,35 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } @Test(timeout = 60000) - public void testObjectMessageWithArrayListPayload() throws Throwable { + public void testStreamMessageSendReceiveFromAMQPToAMQP() throws Throwable { + testStreamMessageSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testStreamMessageSendReceiveFromCoreToAMQP() throws Throwable { + testStreamMessageSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testStreamMessageSendReceiveFromAMQPToCore() throws Throwable { + testStreamMessageSendReceive(createConnection(), createCoreConnection()); + } + + private void testObjectMessageWithArrayListPayload(Connection producerConnection, Connection consumerConnection) throws Throwable { ArrayList payload = new ArrayList<>(); payload.add("aString"); - Connection connection = createConnection(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); ObjectMessage objectMessage = session.createObjectMessage(payload); producer.send(objectMessage); session.close(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer cons = session.createConsumer(queue); - connection.start(); + session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = session.createQueue(getQueueName()); + MessageConsumer cons = session.createConsumer(consumerQueue); + consumerConnection.start(); objectMessage = (ObjectMessage) cons.receive(5000); assertNotNull(objectMessage); @@ -305,15 +378,28 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { ArrayList received = (ArrayList) objectMessage.getObject(); assertEquals(received.get(0), "aString"); - connection.close(); + consumerConnection.close(); } @Test(timeout = 60000) - public void testObjectMessageUsingCustomType() throws Throwable { + public void testObjectMessageWithArrayListPayloadFromAMQPToAMQP() throws Throwable { + testObjectMessageWithArrayListPayload(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testObjectMessageWithArrayListPayloadFromCoreToAMQP() throws Throwable { + testObjectMessageWithArrayListPayload(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testObjectMessageWithArrayListPayloadFromAMQPToCore() throws Throwable { + testObjectMessageWithArrayListPayload(createConnection(), createCoreConnection()); + } + + private void testObjectMessageUsingCustomType(Connection producerConnection, Connection consumerConnection) throws Throwable { long time = System.currentTimeMillis(); - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); @@ -323,8 +409,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { ObjectMessage msg = (ObjectMessage) consumer.receive(5000); @@ -338,6 +425,21 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { System.out.println("taken = " + taken); } + @Test(timeout = 60000) + public void testObjectMessageUsingCustomTypeFromAMQPToAMQP() throws Throwable { + testObjectMessageUsingCustomType(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testObjectMessageUsingCustomTypeFromCoreToAMQP() throws Throwable { + testObjectMessageUsingCustomType(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testObjectMessageUsingCustomTypeFromAMQPToCore() throws Throwable { + testObjectMessageUsingCustomType(createConnection(), createCoreConnection()); + } + public static class AnythingSerializable implements Serializable { private static final long serialVersionUID = 5972085029690947807L; @@ -352,10 +454,8 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } } - @Test(timeout = 60000) - public void testPropertiesArePreserved() throws Exception { - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + private void testPropertiesArePreserved(Connection producerConnection, Connection consumerConnection) throws Exception { + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); @@ -372,9 +472,10 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); producer.send(message); - connection.start(); - - MessageConsumer messageConsumer = session.createConsumer(queue); + consumerConnection.start(); + Session consumerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = session.createQueue(getQueueName()); + MessageConsumer messageConsumer = consumerSession.createConsumer(consumerQueue); TextMessage received = (TextMessage) messageConsumer.receive(5000); Assert.assertNotNull(received); Assert.assertEquals("msg:0", received.getText()); @@ -389,6 +490,21 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { received = (TextMessage) messageConsumer.receive(5000); Assert.assertNotNull(received); - connection.close(); + consumerConnection.close(); } -} + + @Test(timeout = 60000) + public void testPropertiesArePreservedFromAMQPToAMQP() throws Throwable { + testPropertiesArePreserved(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testPropertiesArePreservedFromCoreToAMQP() throws Throwable { + testPropertiesArePreserved(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testPropertiesArePreservedFromAMQPToCore() throws Throwable { + testPropertiesArePreserved(createConnection(), createCoreConnection()); + } +} \ No newline at end of file