This commit is contained in:
Timothy Bish 2017-09-12 09:44:44 -04:00
commit 04a585ff85
2 changed files with 121 additions and 0 deletions

View File

@ -181,6 +181,8 @@ public class AmqpCoreConverter {
populateMessage(result, message.getProtonMessage());
result.getInnerMessage().setReplyTo(message.getReplyTo());
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
result.encode();

View File

@ -47,6 +47,125 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumerTest.class);
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Test(timeout = 30000)
public void testDeliveryModeAMQPProducerCoreConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
testDeliveryMode(connection, connection2);
}
@Test(timeout = 30000)
public void testDeliveryModeAMQPProducerAMQPConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createConnection(); //AMQP
testDeliveryMode(connection, connection2);
}
@Test(timeout = 30000)
public void testDeliveryModeCoreProducerAMQPConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
testDeliveryMode(connection, connection2);
}
@Test(timeout = 30000)
public void testDeliveryModeCoreProducerCoreConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createCoreConnection(); //CORE
testDeliveryMode(connection, connection2);
}
private void testDeliveryMode(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue1 = session1.createQueue(getQueueName());
javax.jms.Queue queue2 = session2.createQueue(getQueueName());
final MessageConsumer consumer2 = session2.createConsumer(queue2);
MessageProducer producer = session1.createProducer(queue1);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message received = consumer2.receive(100);
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
assertEquals(DeliveryMode.PERSISTENT, received.getJMSDeliveryMode());
} finally {
connection1.close();
connection2.close();
}
}
@Test(timeout = 30000)
public void testPriorityAMQPProducerCoreConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
testPriority(connection, connection2);
}
@Test(timeout = 30000)
public void testPriorityAMQPProducerAMQPConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createConnection(); //AMQP
testPriority(connection, connection2);
}
@Test(timeout = 30000)
public void testPriorityModeCoreProducerAMQPConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
testPriority(connection, connection2);
}
@Test(timeout = 30000)
public void testPriorityCoreProducerCoreConsumer() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createCoreConnection(); //CORE
testPriority(connection, connection2);
}
private void testPriority(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue1 = session1.createQueue(getQueueName());
javax.jms.Queue queue2 = session2.createQueue(getQueueName());
final MessageConsumer consumer2 = session2.createConsumer(queue2);
MessageProducer producer = session1.createProducer(queue1);
producer.setPriority(2);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message received = consumer2.receive(100);
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
assertEquals(2, received.getJMSPriority());
} finally {
connection1.close();
connection2.close();
}
}
@Test(timeout = 60000)
public void testSelector() throws Exception {
Connection connection = createConnection();