ARTEMIS-1413: FIX JMSDeliveryMode/Priority on AMQP to CORE conversion
Added test case for cross protocol on JMSDeliveryMode proving issue, and asserting fix Added fix to AmqpCoreConverter to ensure durability (JMSDeliveryMode) is retained. Similar issue spotted with JMSPriority as with JMSDeliveyMode, fixing at the same time. Added extra test case for jmspriority Added fix for jmspriority
This commit is contained in:
parent
4871feac98
commit
ee53b2bf53
|
@ -181,6 +181,8 @@ public class AmqpCoreConverter {
|
||||||
|
|
||||||
populateMessage(result, message.getProtonMessage());
|
populateMessage(result, message.getProtonMessage());
|
||||||
result.getInnerMessage().setReplyTo(message.getReplyTo());
|
result.getInnerMessage().setReplyTo(message.getReplyTo());
|
||||||
|
result.getInnerMessage().setDurable(message.isDurable());
|
||||||
|
result.getInnerMessage().setPriority(message.getPriority());
|
||||||
|
|
||||||
result.encode();
|
result.encode();
|
||||||
|
|
||||||
|
|
|
@ -47,6 +47,125 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumerTest.class);
|
protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumerTest.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getConfiguredProtocols() {
|
||||||
|
return "AMQP,OPENWIRE,CORE";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testDeliveryModeAMQPProducerCoreConsumer() throws Exception {
|
||||||
|
Connection connection = createConnection(); //AMQP
|
||||||
|
Connection connection2 = createCoreConnection(); //CORE
|
||||||
|
testDeliveryMode(connection, connection2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testDeliveryModeAMQPProducerAMQPConsumer() throws Exception {
|
||||||
|
Connection connection = createConnection(); //AMQP
|
||||||
|
Connection connection2 = createConnection(); //AMQP
|
||||||
|
testDeliveryMode(connection, connection2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testDeliveryModeCoreProducerAMQPConsumer() throws Exception {
|
||||||
|
Connection connection = createCoreConnection(); //CORE
|
||||||
|
Connection connection2 = createConnection(); //AMQP
|
||||||
|
testDeliveryMode(connection, connection2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testDeliveryModeCoreProducerCoreConsumer() throws Exception {
|
||||||
|
Connection connection = createCoreConnection(); //CORE
|
||||||
|
Connection connection2 = createCoreConnection(); //CORE
|
||||||
|
testDeliveryMode(connection, connection2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testDeliveryMode(Connection connection1, Connection connection2) throws JMSException {
|
||||||
|
try {
|
||||||
|
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
javax.jms.Queue queue1 = session1.createQueue(getQueueName());
|
||||||
|
javax.jms.Queue queue2 = session2.createQueue(getQueueName());
|
||||||
|
|
||||||
|
final MessageConsumer consumer2 = session2.createConsumer(queue2);
|
||||||
|
|
||||||
|
MessageProducer producer = session1.createProducer(queue1);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
connection1.start();
|
||||||
|
|
||||||
|
TextMessage message = session1.createTextMessage();
|
||||||
|
message.setText("hello");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
Message received = consumer2.receive(100);
|
||||||
|
|
||||||
|
assertNotNull("Should have received a message by now.", received);
|
||||||
|
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
|
||||||
|
assertEquals(DeliveryMode.PERSISTENT, received.getJMSDeliveryMode());
|
||||||
|
} finally {
|
||||||
|
connection1.close();
|
||||||
|
connection2.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testPriorityAMQPProducerCoreConsumer() throws Exception {
|
||||||
|
Connection connection = createConnection(); //AMQP
|
||||||
|
Connection connection2 = createCoreConnection(); //CORE
|
||||||
|
testPriority(connection, connection2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testPriorityAMQPProducerAMQPConsumer() throws Exception {
|
||||||
|
Connection connection = createConnection(); //AMQP
|
||||||
|
Connection connection2 = createConnection(); //AMQP
|
||||||
|
testPriority(connection, connection2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testPriorityModeCoreProducerAMQPConsumer() throws Exception {
|
||||||
|
Connection connection = createCoreConnection(); //CORE
|
||||||
|
Connection connection2 = createConnection(); //AMQP
|
||||||
|
testPriority(connection, connection2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testPriorityCoreProducerCoreConsumer() throws Exception {
|
||||||
|
Connection connection = createCoreConnection(); //CORE
|
||||||
|
Connection connection2 = createCoreConnection(); //CORE
|
||||||
|
testPriority(connection, connection2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testPriority(Connection connection1, Connection connection2) throws JMSException {
|
||||||
|
try {
|
||||||
|
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
javax.jms.Queue queue1 = session1.createQueue(getQueueName());
|
||||||
|
javax.jms.Queue queue2 = session2.createQueue(getQueueName());
|
||||||
|
|
||||||
|
final MessageConsumer consumer2 = session2.createConsumer(queue2);
|
||||||
|
|
||||||
|
MessageProducer producer = session1.createProducer(queue1);
|
||||||
|
producer.setPriority(2);
|
||||||
|
connection1.start();
|
||||||
|
|
||||||
|
TextMessage message = session1.createTextMessage();
|
||||||
|
message.setText("hello");
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
Message received = consumer2.receive(100);
|
||||||
|
|
||||||
|
assertNotNull("Should have received a message by now.", received);
|
||||||
|
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
|
||||||
|
assertEquals(2, received.getJMSPriority());
|
||||||
|
} finally {
|
||||||
|
connection1.close();
|
||||||
|
connection2.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testSelector() throws Exception {
|
public void testSelector() throws Exception {
|
||||||
Connection connection = createConnection();
|
Connection connection = createConnection();
|
||||||
|
|
Loading…
Reference in New Issue