From ee53b2bf53bcfac079780ff9cf81507529c74636 Mon Sep 17 00:00:00 2001 From: Michael Andre Pearce Date: Mon, 11 Sep 2017 23:15:36 +0100 Subject: [PATCH] ARTEMIS-1413: FIX JMSDeliveryMode/Priority on AMQP to CORE conversion Added test case for cross protocol on JMSDeliveryMode proving issue, and asserting fix Added fix to AmqpCoreConverter to ensure durability (JMSDeliveryMode) is retained. Similar issue spotted with JMSPriority as with JMSDeliveyMode, fixing at the same time. Added extra test case for jmspriority Added fix for jmspriority --- .../amqp/converter/AmqpCoreConverter.java | 2 + .../amqp/JMSMessageConsumerTest.java | 119 ++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index 215c77f10e..5512e0e680 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -181,6 +181,8 @@ public class AmqpCoreConverter { populateMessage(result, message.getProtonMessage()); result.getInnerMessage().setReplyTo(message.getReplyTo()); + result.getInnerMessage().setDurable(message.isDurable()); + result.getInnerMessage().setPriority(message.getPriority()); result.encode(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java index 68a9801903..91f6ca8a4f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java @@ -47,6 +47,125 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumerTest.class); + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + @Test(timeout = 30000) + public void testDeliveryModeAMQPProducerCoreConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createCoreConnection(); //CORE + testDeliveryMode(connection, connection2); + } + + @Test(timeout = 30000) + public void testDeliveryModeAMQPProducerAMQPConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createConnection(); //AMQP + testDeliveryMode(connection, connection2); + } + + @Test(timeout = 30000) + public void testDeliveryModeCoreProducerAMQPConsumer() throws Exception { + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createConnection(); //AMQP + testDeliveryMode(connection, connection2); + } + + @Test(timeout = 30000) + public void testDeliveryModeCoreProducerCoreConsumer() throws Exception { + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createCoreConnection(); //CORE + testDeliveryMode(connection, connection2); + } + + private void testDeliveryMode(Connection connection1, Connection connection2) throws JMSException { + try { + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Queue queue1 = session1.createQueue(getQueueName()); + javax.jms.Queue queue2 = session2.createQueue(getQueueName()); + + final MessageConsumer consumer2 = session2.createConsumer(queue2); + + MessageProducer producer = session1.createProducer(queue1); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection1.start(); + + TextMessage message = session1.createTextMessage(); + message.setText("hello"); + producer.send(message); + + Message received = consumer2.receive(100); + + assertNotNull("Should have received a message by now.", received); + assertTrue("Should be an instance of TextMessage", received instanceof TextMessage); + assertEquals(DeliveryMode.PERSISTENT, received.getJMSDeliveryMode()); + } finally { + connection1.close(); + connection2.close(); + } + } + + @Test(timeout = 30000) + public void testPriorityAMQPProducerCoreConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createCoreConnection(); //CORE + testPriority(connection, connection2); + } + + @Test(timeout = 30000) + public void testPriorityAMQPProducerAMQPConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createConnection(); //AMQP + testPriority(connection, connection2); + } + + @Test(timeout = 30000) + public void testPriorityModeCoreProducerAMQPConsumer() throws Exception { + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createConnection(); //AMQP + testPriority(connection, connection2); + } + + @Test(timeout = 30000) + public void testPriorityCoreProducerCoreConsumer() throws Exception { + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createCoreConnection(); //CORE + testPriority(connection, connection2); + } + + private void testPriority(Connection connection1, Connection connection2) throws JMSException { + try { + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Queue queue1 = session1.createQueue(getQueueName()); + javax.jms.Queue queue2 = session2.createQueue(getQueueName()); + + final MessageConsumer consumer2 = session2.createConsumer(queue2); + + MessageProducer producer = session1.createProducer(queue1); + producer.setPriority(2); + connection1.start(); + + TextMessage message = session1.createTextMessage(); + message.setText("hello"); + producer.send(message); + + Message received = consumer2.receive(100); + + assertNotNull("Should have received a message by now.", received); + assertTrue("Should be an instance of TextMessage", received instanceof TextMessage); + assertEquals(2, received.getJMSPriority()); + } finally { + connection1.close(); + connection2.close(); + } + } + @Test(timeout = 60000) public void testSelector() throws Exception { Connection connection = createConnection();