From c1fcadb70668adb95e05f395b62327628b0f05b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Tue, 17 Oct 2017 09:20:34 +0100 Subject: [PATCH] ARTEMIS-1464 Fix Core to AMQP conversion BytesMessage corrupts bytes Extend test cases in MessageTypesTest to cover Core to AMQP combinations for all JMSTypeTests Fix ServerJMSBytesMessage to correctly return bodyLength Remove unused/dead code --- .../amqp/converter/CoreAmqpConverter.java | 10 - .../converter/jms/ServerJMSBytesMessage.java | 3 +- .../integration/amqp/JMSMessageTypesTest.java | 214 ++++++++++++++---- 3 files changed, 166 insertions(+), 61 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java index 111de8c42f..8939982f06 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -404,16 +404,6 @@ public class CoreAmqpConverter { return new Binary(data); } - private static Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException { - Binary result = null; - String text = message.getText(); - if (text != null) { - result = new Binary(text.getBytes(StandardCharsets.UTF_8)); - } - - return result; - } - private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException { message.getInnerMessage().getBodyBuffer().resetReaderIndex(); int size = message.getInnerMessage().getBodyBuffer().readInt(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java index 8d473a7078..b6a829d78c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java @@ -20,7 +20,6 @@ import javax.jms.BytesMessage; import javax.jms.JMSException; import org.apache.activemq.artemis.api.core.ICoreMessage; -import org.apache.activemq.artemis.core.message.impl.CoreMessage; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean; @@ -55,7 +54,7 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess @Override public long getBodyLength() throws JMSException { - return message.getEndOfBodyPosition() - CoreMessage.BODY_OFFSET; + return message.getReadOnlyBodyBuffer().readableBytes(); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java index 9c7488b878..5154845316 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java @@ -49,6 +49,11 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { final int NUM_MESSAGES = 10; + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + @Test(timeout = 60000) public void testAddressControlSendMessage() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); @@ -67,7 +72,7 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); javax.jms.Queue queue = session.createQueue(address.toString()); MessageConsumer consumer = session.createConsumer(queue); - Message message = consumer.receive(500); + Message message = consumer.receive(5000); assertNotNull(message); byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()]; ((BytesMessage)message).readBytes(buffer); @@ -112,12 +117,10 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } } - @Test(timeout = 60000) - public void testBytesMessageSendReceive() throws Throwable { + private void testBytesMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { long time = System.currentTimeMillis(); - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); byte[] bytes = new byte[0xf + 1]; @@ -135,8 +138,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { BytesMessage m = (BytesMessage) consumer.receive(5000); @@ -158,11 +162,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } @Test(timeout = 60000) - public void testMessageSendReceive() throws Throwable { + public void testBytesMessageSendReceiveFromAMQPToAMQP() throws Throwable { + testBytesMessageSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testBytesMessageSendReceiveFromCoreToAMQP() throws Throwable { + testBytesMessageSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testBytesMessageSendReceiveFromAMQPToCore() throws Throwable { + testBytesMessageSendReceive(createConnection(), createCoreConnection()); + } + + private void testMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { long time = System.currentTimeMillis(); - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); byte[] bytes = new byte[0xf + 1]; @@ -179,8 +196,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { Message m = consumer.receive(5000); @@ -192,11 +210,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } @Test(timeout = 60000) - public void testMapMessageSendReceive() throws Throwable { + public void testMessageSendReceiveFromAMQPToAMQP() throws Throwable { + testMessageSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testMessageSendReceiveFromCoreToAMQP() throws Throwable { + testMessageSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testMessageSendReceiveFromAMQPToCore() throws Throwable { + testMessageSendReceive(createConnection(), createCoreConnection()); + } + + private void testMapMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { long time = System.currentTimeMillis(); - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); @@ -209,8 +240,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { MapMessage m = (MapMessage) consumer.receive(5000); @@ -225,11 +257,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } @Test(timeout = 60000) - public void testTextMessageSendReceive() throws Throwable { + public void testMapMessageSendReceiveFromAMQPToAMQP() throws Throwable { + testMapMessageSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testMapMessageSendReceiveFromCoreToAMQP() throws Throwable { + testMapMessageSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testMapMessageSendReceiveFromAMQPToCore() throws Throwable { + testMapMessageSendReceive(createConnection(), createCoreConnection()); + } + + private void testTextMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { long time = System.currentTimeMillis(); - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); @@ -240,8 +285,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { TextMessage m = (TextMessage) consumer.receive(5000); @@ -254,9 +300,22 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } @Test(timeout = 60000) - public void testStreamMessageSendReceive() throws Throwable { - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + public void testTextMessageSendReceiveFromAMQPToAMQP() throws Throwable { + testTextMessageSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testTextMessageSendReceiveFromCoreToAMQP() throws Throwable { + testTextMessageSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testTextMessageSendReceiveFromAMQPToCore() throws Throwable { + testTextMessageSendReceive(createConnection(), createCoreConnection()); + } + + private void testStreamMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); @@ -268,8 +327,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { StreamMessage m = (StreamMessage) consumer.receive(5000); @@ -282,22 +342,35 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } @Test(timeout = 60000) - public void testObjectMessageWithArrayListPayload() throws Throwable { + public void testStreamMessageSendReceiveFromAMQPToAMQP() throws Throwable { + testStreamMessageSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testStreamMessageSendReceiveFromCoreToAMQP() throws Throwable { + testStreamMessageSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testStreamMessageSendReceiveFromAMQPToCore() throws Throwable { + testStreamMessageSendReceive(createConnection(), createCoreConnection()); + } + + private void testObjectMessageWithArrayListPayload(Connection producerConnection, Connection consumerConnection) throws Throwable { ArrayList payload = new ArrayList<>(); payload.add("aString"); - Connection connection = createConnection(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); ObjectMessage objectMessage = session.createObjectMessage(payload); producer.send(objectMessage); session.close(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer cons = session.createConsumer(queue); - connection.start(); + session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = session.createQueue(getQueueName()); + MessageConsumer cons = session.createConsumer(consumerQueue); + consumerConnection.start(); objectMessage = (ObjectMessage) cons.receive(5000); assertNotNull(objectMessage); @@ -305,15 +378,28 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { ArrayList received = (ArrayList) objectMessage.getObject(); assertEquals(received.get(0), "aString"); - connection.close(); + consumerConnection.close(); } @Test(timeout = 60000) - public void testObjectMessageUsingCustomType() throws Throwable { + public void testObjectMessageWithArrayListPayloadFromAMQPToAMQP() throws Throwable { + testObjectMessageWithArrayListPayload(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testObjectMessageWithArrayListPayloadFromCoreToAMQP() throws Throwable { + testObjectMessageWithArrayListPayload(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testObjectMessageWithArrayListPayloadFromAMQPToCore() throws Throwable { + testObjectMessageWithArrayListPayload(createConnection(), createCoreConnection()); + } + + private void testObjectMessageUsingCustomType(Connection producerConnection, Connection consumerConnection) throws Throwable { long time = System.currentTimeMillis(); - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); @@ -323,8 +409,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); } - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); for (int i = 0; i < NUM_MESSAGES; i++) { ObjectMessage msg = (ObjectMessage) consumer.receive(5000); @@ -338,6 +425,21 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { System.out.println("taken = " + taken); } + @Test(timeout = 60000) + public void testObjectMessageUsingCustomTypeFromAMQPToAMQP() throws Throwable { + testObjectMessageUsingCustomType(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testObjectMessageUsingCustomTypeFromCoreToAMQP() throws Throwable { + testObjectMessageUsingCustomType(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testObjectMessageUsingCustomTypeFromAMQPToCore() throws Throwable { + testObjectMessageUsingCustomType(createConnection(), createCoreConnection()); + } + public static class AnythingSerializable implements Serializable { private static final long serialVersionUID = 5972085029690947807L; @@ -352,10 +454,8 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { } } - @Test(timeout = 60000) - public void testPropertiesArePreserved() throws Exception { - Connection connection = createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + private void testPropertiesArePreserved(Connection producerConnection, Connection consumerConnection) throws Exception { + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageProducer producer = session.createProducer(queue); @@ -372,9 +472,10 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); producer.send(message); - connection.start(); - - MessageConsumer messageConsumer = session.createConsumer(queue); + consumerConnection.start(); + Session consumerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = session.createQueue(getQueueName()); + MessageConsumer messageConsumer = consumerSession.createConsumer(consumerQueue); TextMessage received = (TextMessage) messageConsumer.receive(5000); Assert.assertNotNull(received); Assert.assertEquals("msg:0", received.getText()); @@ -389,6 +490,21 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { received = (TextMessage) messageConsumer.receive(5000); Assert.assertNotNull(received); - connection.close(); + consumerConnection.close(); } -} + + @Test(timeout = 60000) + public void testPropertiesArePreservedFromAMQPToAMQP() throws Throwable { + testPropertiesArePreserved(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testPropertiesArePreservedFromCoreToAMQP() throws Throwable { + testPropertiesArePreserved(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testPropertiesArePreservedFromAMQPToCore() throws Throwable { + testPropertiesArePreserved(createConnection(), createCoreConnection()); + } +} \ No newline at end of file