diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index 215c77f10e..5512e0e680 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -181,6 +181,8 @@ public class AmqpCoreConverter { populateMessage(result, message.getProtonMessage()); result.getInnerMessage().setReplyTo(message.getReplyTo()); + result.getInnerMessage().setDurable(message.isDurable()); + result.getInnerMessage().setPriority(message.getPriority()); result.encode(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java index 68a9801903..91f6ca8a4f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java @@ -47,6 +47,125 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumerTest.class); + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + @Test(timeout = 30000) + public void testDeliveryModeAMQPProducerCoreConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createCoreConnection(); //CORE + testDeliveryMode(connection, connection2); + } + + @Test(timeout = 30000) + public void testDeliveryModeAMQPProducerAMQPConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createConnection(); //AMQP + testDeliveryMode(connection, connection2); + } + + @Test(timeout = 30000) + public void testDeliveryModeCoreProducerAMQPConsumer() throws Exception { + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createConnection(); //AMQP + testDeliveryMode(connection, connection2); + } + + @Test(timeout = 30000) + public void testDeliveryModeCoreProducerCoreConsumer() throws Exception { + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createCoreConnection(); //CORE + testDeliveryMode(connection, connection2); + } + + private void testDeliveryMode(Connection connection1, Connection connection2) throws JMSException { + try { + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Queue queue1 = session1.createQueue(getQueueName()); + javax.jms.Queue queue2 = session2.createQueue(getQueueName()); + + final MessageConsumer consumer2 = session2.createConsumer(queue2); + + MessageProducer producer = session1.createProducer(queue1); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection1.start(); + + TextMessage message = session1.createTextMessage(); + message.setText("hello"); + producer.send(message); + + Message received = consumer2.receive(100); + + assertNotNull("Should have received a message by now.", received); + assertTrue("Should be an instance of TextMessage", received instanceof TextMessage); + assertEquals(DeliveryMode.PERSISTENT, received.getJMSDeliveryMode()); + } finally { + connection1.close(); + connection2.close(); + } + } + + @Test(timeout = 30000) + public void testPriorityAMQPProducerCoreConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createCoreConnection(); //CORE + testPriority(connection, connection2); + } + + @Test(timeout = 30000) + public void testPriorityAMQPProducerAMQPConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createConnection(); //AMQP + testPriority(connection, connection2); + } + + @Test(timeout = 30000) + public void testPriorityModeCoreProducerAMQPConsumer() throws Exception { + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createConnection(); //AMQP + testPriority(connection, connection2); + } + + @Test(timeout = 30000) + public void testPriorityCoreProducerCoreConsumer() throws Exception { + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createCoreConnection(); //CORE + testPriority(connection, connection2); + } + + private void testPriority(Connection connection1, Connection connection2) throws JMSException { + try { + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Queue queue1 = session1.createQueue(getQueueName()); + javax.jms.Queue queue2 = session2.createQueue(getQueueName()); + + final MessageConsumer consumer2 = session2.createConsumer(queue2); + + MessageProducer producer = session1.createProducer(queue1); + producer.setPriority(2); + connection1.start(); + + TextMessage message = session1.createTextMessage(); + message.setText("hello"); + producer.send(message); + + Message received = consumer2.receive(100); + + assertNotNull("Should have received a message by now.", received); + assertTrue("Should be an instance of TextMessage", received instanceof TextMessage); + assertEquals(2, received.getJMSPriority()); + } finally { + connection1.close(); + connection2.close(); + } + } + @Test(timeout = 60000) public void testSelector() throws Exception { Connection connection = createConnection();